Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertArrayEquals;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.verify;
22
23 import akka.actor.ActorRef;
24 import akka.dispatch.Dispatchers;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.base.Stopwatch;
29 import com.google.common.io.ByteSource;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import java.io.OutputStream;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Optional;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.junit.After;
38 import org.junit.Test;
39 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
40 import org.opendaylight.controller.cluster.raft.MockRaftActor;
41 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
42 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
43 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
44 import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
45 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
46 import org.opendaylight.controller.cluster.raft.RaftActorContext;
47 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
48 import org.opendaylight.controller.cluster.raft.RaftVersions;
49 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
50 import org.opendaylight.controller.cluster.raft.VotingState;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
52 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
53 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
54 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
55 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
63 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
65 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
67 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
69 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
72 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
73 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
74 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
75 import scala.concurrent.duration.FiniteDuration;
76
77 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
78
79     private final ActorRef followerActor = actorFactory.createActor(
80             MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
81
82     private final ActorRef leaderActor = actorFactory.createActor(
83             MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
84
85     private Follower follower;
86
87     private final short payloadVersion = 5;
88
89     @Override
90     @After
91     public void tearDown() {
92         if (follower != null) {
93             follower.close();
94         }
95
96         super.tearDown();
97     }
98
99     @Override
100     protected Follower createBehavior(final RaftActorContext actorContext) {
101         return spy(new Follower(actorContext));
102     }
103
104     @Override
105     protected  MockRaftActorContext createActorContext() {
106         return createActorContext(followerActor);
107     }
108
109     @Override
110     protected  MockRaftActorContext createActorContext(final ActorRef actorRef) {
111         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
112         context.setPayloadVersion(payloadVersion);
113         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
114             peerId -> leaderActor.path().toString());
115         return context;
116     }
117
118     @Test
119     public void testThatAnElectionTimeoutIsTriggered() {
120         MockRaftActorContext actorContext = createActorContext();
121         follower = new Follower(actorContext);
122
123         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
124                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
125     }
126
127     @Test
128     public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
129         logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
130
131         MockRaftActorContext context = createActorContext();
132         follower = new Follower(context);
133
134         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
135                 TimeUnit.MILLISECONDS);
136         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
137
138         assertTrue(raftBehavior instanceof Candidate);
139     }
140
141     @Test
142     public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
143         logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
144
145         MockRaftActorContext context = createActorContext();
146         ((DefaultConfigParamsImpl) context.getConfigParams())
147                 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
148         ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
149
150         follower = new Follower(context);
151         context.setCurrentBehavior(follower);
152
153         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
154                 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
155         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
156                 -1, -1, (short) 1));
157
158         Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
159         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
160         assertTrue(raftBehavior instanceof Follower);
161
162         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
163                 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
164         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
165                 -1, -1, (short) 1));
166
167         Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
168         raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
169         assertTrue(raftBehavior instanceof Follower);
170     }
171
172     @Test
173     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
174         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
175
176         MockRaftActorContext context = createActorContext();
177         long term = 1000;
178         context.getTermInformation().update(term, null);
179
180         follower = createBehavior(context);
181
182         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
183
184         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
185
186         assertEquals("isVoteGranted", true, reply.isVoteGranted());
187         assertEquals("getTerm", term, reply.getTerm());
188         verify(follower).scheduleElection(any(FiniteDuration.class));
189     }
190
191     @Test
192     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
193         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
194
195         MockRaftActorContext context = createActorContext();
196         long term = 1000;
197         context.getTermInformation().update(term, "test");
198
199         follower = createBehavior(context);
200
201         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
202
203         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
204
205         assertEquals("isVoteGranted", false, reply.isVoteGranted());
206         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
207     }
208
209
210     @Test
211     public void testHandleFirstAppendEntries() {
212         logStart("testHandleFirstAppendEntries");
213
214         MockRaftActorContext context = createActorContext();
215         context.getReplicatedLog().clear(0,2);
216         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
217         context.getReplicatedLog().setSnapshotIndex(99);
218
219         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
220
221         assertEquals(1, context.getReplicatedLog().size());
222
223         // The new commitIndex is 101
224         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
225
226         follower = createBehavior(context);
227         follower.handleMessage(leaderActor, appendEntries);
228
229         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
230                 FollowerInitialSyncUpStatus.class);
231         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
232
233         assertFalse(syncStatus.isInitialSyncDone());
234         assertTrue("append entries reply should be true", reply.isSuccess());
235     }
236
237     @Test
238     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
239         logStart("testHandleFirstAppendEntries");
240
241         MockRaftActorContext context = createActorContext();
242
243         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
244
245         // The new commitIndex is 101
246         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
247
248         follower = createBehavior(context);
249         follower.handleMessage(leaderActor, appendEntries);
250
251         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
252                 FollowerInitialSyncUpStatus.class);
253         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
254
255         assertFalse(syncStatus.isInitialSyncDone());
256         assertFalse("append entries reply should be false", reply.isSuccess());
257     }
258
259     @Test
260     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
261         logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
262
263         MockRaftActorContext context = createActorContext();
264         context.getReplicatedLog().clear(0,2);
265         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
266         context.getReplicatedLog().setSnapshotIndex(99);
267
268         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
269
270         // The new commitIndex is 101
271         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
272
273         follower = createBehavior(context);
274         follower.handleMessage(leaderActor, appendEntries);
275
276         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
277                 FollowerInitialSyncUpStatus.class);
278         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
279
280         assertFalse(syncStatus.isInitialSyncDone());
281         assertTrue("append entries reply should be true", reply.isSuccess());
282     }
283
284     @Test
285     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
286         logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
287
288         MockRaftActorContext context = createActorContext();
289         context.getReplicatedLog().clear(0,2);
290         context.getReplicatedLog().setSnapshotIndex(100);
291
292         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
293
294         // The new commitIndex is 101
295         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
296
297         follower = createBehavior(context);
298         follower.handleMessage(leaderActor, appendEntries);
299
300         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
301                 FollowerInitialSyncUpStatus.class);
302         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
303
304         assertFalse(syncStatus.isInitialSyncDone());
305         assertTrue("append entries reply should be true", reply.isSuccess());
306     }
307
308     @Test
309     public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
310         logStart(
311                "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
312
313         MockRaftActorContext context = createActorContext();
314         context.getReplicatedLog().clear(0,2);
315         context.getReplicatedLog().setSnapshotIndex(100);
316
317         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 105, "foo"));
318
319         // The new commitIndex is 101
320         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
321
322         follower = createBehavior(context);
323         follower.handleMessage(leaderActor, appendEntries);
324
325         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
326                 FollowerInitialSyncUpStatus.class);
327         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
328
329         assertFalse(syncStatus.isInitialSyncDone());
330         assertFalse("append entries reply should be false", reply.isSuccess());
331     }
332
333     @Test
334     public void testHandleSyncUpAppendEntries() {
335         logStart("testHandleSyncUpAppendEntries");
336
337         MockRaftActorContext context = createActorContext();
338
339         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
340
341         // The new commitIndex is 101
342         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
343
344         follower = createBehavior(context);
345         follower.handleMessage(leaderActor, appendEntries);
346
347         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
348                 FollowerInitialSyncUpStatus.class);
349
350         assertFalse(syncStatus.isInitialSyncDone());
351
352         // Clear all the messages
353         MessageCollectorActor.clearMessages(followerActor);
354
355         context.setLastApplied(101);
356         context.setCommitIndex(101);
357         setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
358
359         entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
360
361         // The new commitIndex is 101
362         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
363         follower.handleMessage(leaderActor, appendEntries);
364
365         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
366
367         assertTrue(syncStatus.isInitialSyncDone());
368
369         MessageCollectorActor.clearMessages(followerActor);
370
371         // Sending the same message again should not generate another message
372
373         follower.handleMessage(leaderActor, appendEntries);
374
375         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
376
377         assertNull(syncStatus);
378     }
379
380     @Test
381     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
382         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
383
384         MockRaftActorContext context = createActorContext();
385
386         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
387
388         // The new commitIndex is 101
389         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
390
391         follower = createBehavior(context);
392         follower.handleMessage(leaderActor, appendEntries);
393
394         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
395                 FollowerInitialSyncUpStatus.class);
396
397         assertFalse(syncStatus.isInitialSyncDone());
398
399         // Clear all the messages
400         MessageCollectorActor.clearMessages(followerActor);
401
402         context.setLastApplied(100);
403         setLastLogEntry(context, 1, 100,
404                 new MockRaftActorContext.MockPayload(""));
405
406         entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
407
408         // leader-2 is becoming the leader now and it says the commitIndex is 45
409         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
410         follower.handleMessage(leaderActor, appendEntries);
411
412         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
413
414         // We get a new message saying initial status is not done
415         assertFalse(syncStatus.isInitialSyncDone());
416     }
417
418     @Test
419     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
420         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
421
422         MockRaftActorContext context = createActorContext();
423
424         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
425
426         // The new commitIndex is 101
427         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
428
429         follower = createBehavior(context);
430         follower.handleMessage(leaderActor, appendEntries);
431
432         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
433                 FollowerInitialSyncUpStatus.class);
434
435         assertFalse(syncStatus.isInitialSyncDone());
436
437         // Clear all the messages
438         MessageCollectorActor.clearMessages(followerActor);
439
440         context.setLastApplied(101);
441         context.setCommitIndex(101);
442         setLastLogEntry(context, 1, 101,
443                 new MockRaftActorContext.MockPayload(""));
444
445         entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
446
447         // The new commitIndex is 101
448         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
449         follower.handleMessage(leaderActor, appendEntries);
450
451         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
452
453         assertTrue(syncStatus.isInitialSyncDone());
454
455         // Clear all the messages
456         MessageCollectorActor.clearMessages(followerActor);
457
458         context.setLastApplied(100);
459         setLastLogEntry(context, 1, 100,
460                 new MockRaftActorContext.MockPayload(""));
461
462         entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
463
464         // leader-2 is becoming the leader now and it says the commitIndex is 45
465         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
466         follower.handleMessage(leaderActor, appendEntries);
467
468         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
469
470         // We get a new message saying initial status is not done
471         assertFalse(syncStatus.isInitialSyncDone());
472     }
473
474     /**
475      * This test verifies that when an AppendEntries RPC is received by a RaftActor
476      * with a commitIndex that is greater than what has been applied to the
477      * state machine of the RaftActor, the RaftActor applies the state and
478      * sets it current applied state to the commitIndex of the sender.
479      */
480     @Test
481     public void testHandleAppendEntriesWithNewerCommitIndex() {
482         logStart("testHandleAppendEntriesWithNewerCommitIndex");
483
484         MockRaftActorContext context = createActorContext();
485
486         context.setLastApplied(100);
487         setLastLogEntry(context, 1, 100,
488                 new MockRaftActorContext.MockPayload(""));
489         context.getReplicatedLog().setSnapshotIndex(99);
490
491         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
492
493         // The new commitIndex is 101
494         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
495
496         follower = createBehavior(context);
497         follower.handleMessage(leaderActor, appendEntries);
498
499         assertEquals("getLastApplied", 101L, context.getLastApplied());
500     }
501
502     /**
503      * This test verifies that when an AppendEntries is received with a prevLogTerm
504      * which does not match the term that is in RaftActors log entry at prevLogIndex
505      * then the RaftActor does not change it's state and it returns a failure.
506      */
507     @Test
508     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
509         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
510
511         MockRaftActorContext context = createActorContext();
512
513         AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, List.of(), 101, -1, (short)0);
514
515         follower = createBehavior(context);
516
517         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
518
519         assertSame(follower, newBehavior);
520
521         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
522                 AppendEntriesReply.class);
523
524         assertEquals("isSuccess", false, reply.isSuccess());
525     }
526
527     @Test
528     public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
529         logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
530
531         MockRaftActorContext context = createActorContext();
532         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
533         context.getReplicatedLog().setSnapshotIndex(4);
534         context.getReplicatedLog().setSnapshotTerm(3);
535
536         AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, List.of(), 8, -1, (short)0);
537
538         follower = createBehavior(context);
539
540         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
541
542         assertSame(follower, newBehavior);
543
544         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
545
546         assertEquals("isSuccess", true, reply.isSuccess());
547     }
548
549     /**
550      * This test verifies that when a new AppendEntries message is received with
551      * new entries and the logs of the sender and receiver match that the new
552      * entries get added to the log and the log is incremented by the number of
553      * entries received in appendEntries.
554      */
555     @Test
556     public void testHandleAppendEntriesAddNewEntries() {
557         logStart("testHandleAppendEntriesAddNewEntries");
558
559         MockRaftActorContext context = createActorContext();
560
561         // First set the receivers term to lower number
562         context.getTermInformation().update(1, "test");
563
564         // Prepare the receivers log
565         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
566         log.append(newReplicatedLogEntry(1, 0, "zero"));
567         log.append(newReplicatedLogEntry(1, 1, "one"));
568         log.append(newReplicatedLogEntry(1, 2, "two"));
569
570         context.setReplicatedLog(log);
571
572         // Prepare the entries to be sent with AppendEntries
573         List<ReplicatedLogEntry> entries = List.of(
574             newReplicatedLogEntry(1, 3, "three"), newReplicatedLogEntry(1, 4, "four"));
575
576         // Send appendEntries with the same term as was set on the receiver
577         // before the new behavior was created (1 in this case)
578         // This will not work for a Candidate because as soon as a Candidate
579         // is created it increments the term
580         short leaderPayloadVersion = 10;
581         String leaderId = "leader-1";
582         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
583
584         follower = createBehavior(context);
585
586         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
587
588         assertSame(follower, newBehavior);
589
590         assertEquals("Next index", 5, log.last().getIndex() + 1);
591         assertEquals("Entry 3", entries.get(0), log.get(3));
592         assertEquals("Entry 4", entries.get(1), log.get(4));
593
594         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
595         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
596
597         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
598     }
599
600     /**
601      * This test verifies that when a new AppendEntries message is received with
602      * new entries and the logs of the sender and receiver are out-of-sync that
603      * the log is first corrected by removing the out of sync entries from the
604      * log and then adding in the new entries sent with the AppendEntries message.
605      */
606     @Test
607     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
608         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
609
610         MockRaftActorContext context = createActorContext();
611
612         // First set the receivers term to lower number
613         context.getTermInformation().update(1, "test");
614
615         // Prepare the receivers log
616         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
617         log.append(newReplicatedLogEntry(1, 0, "zero"));
618         log.append(newReplicatedLogEntry(1, 1, "one"));
619         log.append(newReplicatedLogEntry(1, 2, "two"));
620
621         context.setReplicatedLog(log);
622
623         // Prepare the entries to be sent with AppendEntries
624         List<ReplicatedLogEntry> entries = List.of(
625             newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
626
627         // Send appendEntries with the same term as was set on the receiver
628         // before the new behavior was created (1 in this case)
629         // This will not work for a Candidate because as soon as a Candidate
630         // is created it increments the term
631         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
632
633         follower = createBehavior(context);
634
635         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
636
637         assertSame(follower, newBehavior);
638
639         // The entry at index 2 will be found out-of-sync with the leader
640         // and will be removed
641         // Then the two new entries will be added to the log
642         // Thus making the log to have 4 entries
643         assertEquals("Next index", 4, log.last().getIndex() + 1);
644         //assertEquals("Entry 2", entries.get(0), log.get(2));
645
646         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
647
648         // Check that the entry at index 2 has the new data
649         assertEquals("Entry 2", entries.get(0), log.get(2));
650
651         assertEquals("Entry 3", entries.get(1), log.get(3));
652
653         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
654     }
655
656     @Test
657     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
658         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
659
660         MockRaftActorContext context = createActorContext();
661
662         // First set the receivers term to lower number
663         context.getTermInformation().update(1, "test");
664
665         // Prepare the receivers log
666         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
667         log.append(newReplicatedLogEntry(1, 0, "zero"));
668         log.append(newReplicatedLogEntry(1, 1, "one"));
669         log.append(newReplicatedLogEntry(1, 2, "two"));
670
671         context.setReplicatedLog(log);
672
673         // Prepare the entries to be sent with AppendEntries
674         List<ReplicatedLogEntry> entries = List.of(
675             newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
676
677         // Send appendEntries with the same term as was set on the receiver
678         // before the new behavior was created (1 in this case)
679         // This will not work for a Candidate because as soon as a Candidate
680         // is created it increments the term
681         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
682
683         context.setRaftPolicy(createRaftPolicy(false, true));
684         follower = createBehavior(context);
685
686         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
687
688         assertSame(follower, newBehavior);
689
690         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
691     }
692
693     @Test
694     public void testHandleAppendEntriesPreviousLogEntryMissing() {
695         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
696
697         final MockRaftActorContext context = createActorContext();
698
699         // Prepare the receivers log
700         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
701         log.append(newReplicatedLogEntry(1, 0, "zero"));
702         log.append(newReplicatedLogEntry(1, 1, "one"));
703         log.append(newReplicatedLogEntry(1, 2, "two"));
704
705         context.setReplicatedLog(log);
706
707         // Prepare the entries to be sent with AppendEntries
708         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
709
710         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
711
712         follower = createBehavior(context);
713
714         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
715
716         assertSame(follower, newBehavior);
717
718         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
719     }
720
721     @Test
722     public void testHandleAppendEntriesWithExistingLogEntry() {
723         logStart("testHandleAppendEntriesWithExistingLogEntry");
724
725         MockRaftActorContext context = createActorContext();
726
727         context.getTermInformation().update(1, "test");
728
729         // Prepare the receivers log
730         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
731         log.append(newReplicatedLogEntry(1, 0, "zero"));
732         log.append(newReplicatedLogEntry(1, 1, "one"));
733
734         context.setReplicatedLog(log);
735
736         // Send the last entry again.
737         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 1, "one"));
738
739         follower = createBehavior(context);
740
741         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
742
743         assertEquals("Next index", 2, log.last().getIndex() + 1);
744         assertEquals("Entry 1", entries.get(0), log.get(1));
745
746         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
747
748         // Send the last entry again and also a new one.
749
750         entries = List.of(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
751
752         MessageCollectorActor.clearMessages(leaderActor);
753         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
754
755         assertEquals("Next index", 3, log.last().getIndex() + 1);
756         assertEquals("Entry 1", entries.get(0), log.get(1));
757         assertEquals("Entry 2", entries.get(1), log.get(2));
758
759         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
760     }
761
762     @Test
763     public void testHandleAppendEntriesAfterInstallingSnapshot() {
764         logStart("testHandleAppendAfterInstallingSnapshot");
765
766         MockRaftActorContext context = createActorContext();
767
768         // Prepare the receivers log
769         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
770
771         // Set up a log as if it has been snapshotted
772         log.setSnapshotIndex(3);
773         log.setSnapshotTerm(1);
774
775         context.setReplicatedLog(log);
776
777         // Prepare the entries to be sent with AppendEntries
778         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
779
780         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
781
782         follower = createBehavior(context);
783
784         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
785
786         assertSame(follower, newBehavior);
787
788         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
789     }
790
791     /**
792      * This test verifies that when InstallSnapshot is received by
793      * the follower its applied correctly.
794      */
795     @Test
796     public void testHandleInstallSnapshot() {
797         logStart("testHandleInstallSnapshot");
798
799         MockRaftActorContext context = createActorContext();
800         context.getTermInformation().update(1, "leader");
801
802         follower = createBehavior(context);
803
804         ByteString bsSnapshot = createSnapshot();
805         int offset = 0;
806         int snapshotLength = bsSnapshot.size();
807         int chunkSize = 50;
808         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
809         int lastIncludedIndex = 1;
810         int chunkIndex = 1;
811         InstallSnapshot lastInstallSnapshot = null;
812
813         for (int i = 0; i < totalChunks; i++) {
814             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
815             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
816                     chunkData, chunkIndex, totalChunks);
817             follower.handleMessage(leaderActor, lastInstallSnapshot);
818             offset = offset + 50;
819             lastIncludedIndex++;
820             chunkIndex++;
821         }
822
823         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
824                 ApplySnapshot.class);
825         Snapshot snapshot = applySnapshot.getSnapshot();
826         assertNotNull(lastInstallSnapshot);
827         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
828         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
829                 snapshot.getLastAppliedTerm());
830         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
831                 snapshot.getLastAppliedIndex());
832         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
833         assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
834         assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
835         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
836         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
837         applySnapshot.getCallback().onSuccess();
838
839         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
840                 leaderActor, InstallSnapshotReply.class);
841         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
842
843         chunkIndex = 1;
844         for (InstallSnapshotReply reply: replies) {
845             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
846             assertEquals("getTerm", 1, reply.getTerm());
847             assertEquals("isSuccess", true, reply.isSuccess());
848             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
849         }
850
851         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
852     }
853
854     /**
855      * Verify that when an AppendEntries is sent to a follower during a snapshot install
856      * the Follower short-circuits the processing of the AppendEntries message.
857      */
858     @Test
859     public void testReceivingAppendEntriesDuringInstallSnapshot() {
860         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
861
862         MockRaftActorContext context = createActorContext();
863
864         follower = createBehavior(context);
865
866         ByteString bsSnapshot  = createSnapshot();
867         int snapshotLength = bsSnapshot.size();
868         int chunkSize = 50;
869         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
870         int lastIncludedIndex = 1;
871
872         // Check that snapshot installation is not in progress
873         assertNull(follower.getSnapshotTracker());
874
875         // Make sure that we have more than 1 chunk to send
876         assertTrue(totalChunks > 1);
877
878         // Send an install snapshot with the first chunk to start the process of installing a snapshot
879         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
880         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
881                 chunkData, 1, totalChunks));
882
883         // Check if snapshot installation is in progress now
884         assertNotNull(follower.getSnapshotTracker());
885
886         // Send an append entry
887         AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
888                 List.of(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
889
890         follower.handleMessage(leaderActor, appendEntries);
891
892         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
893         assertEquals("isSuccess", true, reply.isSuccess());
894         assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
895         assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
896         assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
897
898         assertNotNull(follower.getSnapshotTracker());
899     }
900
901     @Test
902     public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
903         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
904
905         MockRaftActorContext context = createActorContext();
906
907         follower = createBehavior(context);
908
909         ByteString bsSnapshot  = createSnapshot();
910         int snapshotLength = bsSnapshot.size();
911         int chunkSize = 50;
912         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
913         int lastIncludedIndex = 1;
914
915         // Check that snapshot installation is not in progress
916         assertNull(follower.getSnapshotTracker());
917
918         // Make sure that we have more than 1 chunk to send
919         assertTrue(totalChunks > 1);
920
921         // Send an install snapshot with the first chunk to start the process of installing a snapshot
922         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
923         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
924                 chunkData, 1, totalChunks));
925
926         // Check if snapshot installation is in progress now
927         assertNotNull(follower.getSnapshotTracker());
928
929         // Send appendEntries with a new term and leader.
930         AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
931                 List.of(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
932
933         follower.handleMessage(leaderActor, appendEntries);
934
935         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
936         assertEquals("isSuccess", true, reply.isSuccess());
937         assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
938         assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
939         assertEquals("getTerm", 2, reply.getTerm());
940
941         assertNull(follower.getSnapshotTracker());
942     }
943
944     @Test
945     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
946         logStart("testInitialSyncUpWithHandleInstallSnapshot");
947
948         MockRaftActorContext context = createActorContext();
949         context.setCommitIndex(-1);
950
951         follower = createBehavior(context);
952
953         ByteString bsSnapshot  = createSnapshot();
954         int offset = 0;
955         int snapshotLength = bsSnapshot.size();
956         int chunkSize = 50;
957         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
958         int lastIncludedIndex = 1;
959         int chunkIndex = 1;
960         InstallSnapshot lastInstallSnapshot = null;
961
962         for (int i = 0; i < totalChunks; i++) {
963             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
964             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
965                     chunkData, chunkIndex, totalChunks);
966             follower.handleMessage(leaderActor, lastInstallSnapshot);
967             offset = offset + 50;
968             lastIncludedIndex++;
969             chunkIndex++;
970         }
971
972         FollowerInitialSyncUpStatus syncStatus =
973                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
974
975         assertFalse(syncStatus.isInitialSyncDone());
976
977         // Clear all the messages
978         MessageCollectorActor.clearMessages(followerActor);
979
980         context.setLastApplied(101);
981         context.setCommitIndex(101);
982         setLastLogEntry(context, 1, 101,
983                 new MockRaftActorContext.MockPayload(""));
984
985         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
986
987         // The new commitIndex is 101
988         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
989         follower.handleMessage(leaderActor, appendEntries);
990
991         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
992
993         assertTrue(syncStatus.isInitialSyncDone());
994     }
995
996     @Test
997     public void testHandleOutOfSequenceInstallSnapshot() {
998         logStart("testHandleOutOfSequenceInstallSnapshot");
999
1000         MockRaftActorContext context = createActorContext();
1001
1002         follower = createBehavior(context);
1003
1004         ByteString bsSnapshot = createSnapshot();
1005
1006         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1007                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1008         follower.handleMessage(leaderActor, installSnapshot);
1009
1010         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1011                 InstallSnapshotReply.class);
1012
1013         assertEquals("isSuccess", false, reply.isSuccess());
1014         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1015         assertEquals("getTerm", 1, reply.getTerm());
1016         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1017
1018         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1019     }
1020
1021     @Test
1022     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1023         MockRaftActorContext context = createActorContext();
1024
1025         Stopwatch stopwatch = Stopwatch.createStarted();
1026
1027         follower = createBehavior(context);
1028
1029         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1030
1031         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1032
1033         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1034
1035         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1036         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1037     }
1038
1039     @Test
1040     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1041         MockRaftActorContext context = createActorContext();
1042         context.setConfigParams(new DefaultConfigParamsImpl() {
1043             @Override
1044             public FiniteDuration getElectionTimeOutInterval() {
1045                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1046             }
1047         });
1048
1049         context.setRaftPolicy(createRaftPolicy(false, false));
1050
1051         follower = createBehavior(context);
1052
1053         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1054         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1055         assertSame("handleMessage result", follower, newBehavior);
1056     }
1057
1058     @Test
1059     public void testFollowerSchedulesElectionIfNonVoting() {
1060         MockRaftActorContext context = createActorContext();
1061         context.updatePeerIds(new ServerConfigurationPayload(List.of(new ServerInfo(context.getId(), false))));
1062         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1063                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1064         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1065
1066         follower = new Follower(context, "leader", (short)1);
1067
1068         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1069                 ElectionTimeout.class);
1070         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1071         assertSame("handleMessage result", follower, newBehavior);
1072         assertNull("Expected null leaderId", follower.getLeaderId());
1073     }
1074
1075     @Test
1076     public void testElectionScheduledWhenAnyRaftRPCReceived() {
1077         MockRaftActorContext context = createActorContext();
1078         follower = createBehavior(context);
1079         follower.handleMessage(leaderActor, new RaftRPC() {
1080             private static final long serialVersionUID = 1L;
1081
1082             @Override
1083             public long getTerm() {
1084                 return 100;
1085             }
1086         });
1087         verify(follower).scheduleElection(any(FiniteDuration.class));
1088     }
1089
1090     @Test
1091     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1092         MockRaftActorContext context = createActorContext();
1093         follower = createBehavior(context);
1094         follower.handleMessage(leaderActor, "non-raft-rpc");
1095         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1096     }
1097
1098     @Test
1099     public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1100         String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1101         logStart(id);
1102
1103         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1104
1105         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1106         config.setSnapshotBatchCount(2);
1107         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1108
1109         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1110         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1111         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1112                 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1113         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1114                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1115         followerRaftActor.set(followerActorRef.underlyingActor());
1116         followerRaftActor.get().waitForInitializeBehaviorComplete();
1117
1118         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1119         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1120         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1121
1122         List<ReplicatedLogEntry> entries = List.of(
1123                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1124
1125         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1126
1127         followerActorRef.tell(appendEntries, leaderActor);
1128
1129         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1130         assertEquals("isSuccess", true, reply.isSuccess());
1131
1132         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1133
1134         InMemoryJournal.waitForDeleteMessagesComplete(id);
1135         InMemoryJournal.waitForWriteMessagesComplete(id);
1136         // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1137         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1138         // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1139         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1140         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1141         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1142         assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1143
1144         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1145         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1146         assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1147         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1148         assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1149         assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData()),
1150                 MockRaftActor.fromState(snapshot.getState()));
1151     }
1152
1153     @Test
1154     public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1155         String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1156         logStart(id);
1157
1158         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1159
1160         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1161         config.setSnapshotBatchCount(2);
1162         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1163
1164         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1165         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1166         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1167                 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1168         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1169                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1170         followerRaftActor.set(followerActorRef.underlyingActor());
1171         followerRaftActor.get().waitForInitializeBehaviorComplete();
1172
1173         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1174         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1175         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1176
1177         List<ReplicatedLogEntry> entries = List.of(
1178                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1179                 newReplicatedLogEntry(1, 2, "three"));
1180
1181         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1182
1183         followerActorRef.tell(appendEntries, leaderActor);
1184
1185         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1186         assertEquals("isSuccess", true, reply.isSuccess());
1187
1188         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1189
1190         InMemoryJournal.waitForDeleteMessagesComplete(id);
1191         InMemoryJournal.waitForWriteMessagesComplete(id);
1192         // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1193         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1194         // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1195         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1196         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1197         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1198         assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1199
1200         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1201         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1202         assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1203         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1204         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1205         assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData(),
1206                 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1207
1208         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1209         assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1210
1211         // Reinstate the actor from persistence
1212
1213         actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
1214
1215         followerActorRef = actorFactory.createTestActor(builder.props()
1216                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1217         followerRaftActor.set(followerActorRef.underlyingActor());
1218         followerRaftActor.get().waitForInitializeBehaviorComplete();
1219
1220         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1221         assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1222         assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1223         assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1224         assertEquals("State", List.of(entries.get(0).getData(), entries.get(1).getData(),
1225                 entries.get(2).getData()), followerRaftActor.get().getState());
1226     }
1227
1228     @Test
1229     public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1230         String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1231         logStart(id);
1232
1233         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1234
1235         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1236         config.setSnapshotBatchCount(1);
1237         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1238
1239         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1240         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1241         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1242                 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1243         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1244                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1245         followerRaftActor.set(followerActorRef.underlyingActor());
1246         followerRaftActor.get().waitForInitializeBehaviorComplete();
1247
1248         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1249         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1250         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1251
1252         List<ReplicatedLogEntry> entries = List.of(
1253                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1254                 newReplicatedLogEntry(1, 2, "three"));
1255
1256         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1257
1258         followerActorRef.tell(appendEntries, leaderActor);
1259
1260         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1261         assertEquals("isSuccess", true, reply.isSuccess());
1262
1263         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1264
1265         InMemoryJournal.waitForDeleteMessagesComplete(id);
1266         InMemoryJournal.waitForWriteMessagesComplete(id);
1267         // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1268         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1269         // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1270         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1271         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1272         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1273         assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1274
1275         assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1276         assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1277         assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1278         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1279         assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1280         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1281         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1282         assertEquals("Snapshot state", List.of(entries.get(0).getData()),
1283                 MockRaftActor.fromState(snapshot.getState()));
1284     }
1285
1286     @Test
1287     public void testNeedsLeaderAddress() {
1288         logStart("testNeedsLeaderAddress");
1289
1290         MockRaftActorContext context = createActorContext();
1291         context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
1292         context.addToPeers("leader", null, VotingState.VOTING);
1293         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
1294
1295         follower = createBehavior(context);
1296
1297         follower.handleMessage(leaderActor,
1298                 new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1, (short)0));
1299
1300         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1301         assertTrue(reply.isNeedsLeaderAddress());
1302         MessageCollectorActor.clearMessages(leaderActor);
1303
1304         PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
1305         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
1306
1307         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1,
1308                 (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
1309
1310         reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1311         assertFalse(reply.isNeedsLeaderAddress());
1312
1313         verify(mockResolver).setResolved("leader", leaderActor.path().toString());
1314     }
1315
1316     @SuppressWarnings("checkstyle:IllegalCatch")
1317     private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1318             final AtomicReference<MockRaftActor> followerRaftActor) {
1319         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1320             @Override
1321             public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
1322                 try {
1323                     actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1324                             installSnapshotStream), actorRef);
1325                 } catch (RuntimeException e) {
1326                     throw e;
1327                 } catch (Exception e) {
1328                     throw new RuntimeException(e);
1329                 }
1330             }
1331
1332             @Override
1333             public void applySnapshot(final State snapshotState) {
1334             }
1335
1336             @Override
1337             public State deserializeSnapshot(final ByteSource snapshotBytes) {
1338                 throw new UnsupportedOperationException();
1339             }
1340         };
1341         return snapshotCohort;
1342     }
1343
1344     public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1345         int snapshotLength = bs.size();
1346         int start = offset;
1347         int size = chunkSize;
1348         if (chunkSize > snapshotLength) {
1349             size = snapshotLength;
1350         } else if (start + chunkSize > snapshotLength) {
1351             size = snapshotLength - start;
1352         }
1353
1354         byte[] nextChunk = new byte[size];
1355         bs.copyTo(nextChunk, start, 0, size);
1356         return nextChunk;
1357     }
1358
1359     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1360             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1361         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1362     }
1363
1364     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1365             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1366             final boolean expForceInstallSnapshot) {
1367
1368         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1369                 AppendEntriesReply.class);
1370
1371         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1372         assertEquals("getTerm", expTerm, reply.getTerm());
1373         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1374         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1375         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1376         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1377         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1378         assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
1379     }
1380
1381
1382     private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1383         return new SimpleReplicatedLogEntry(index, term,
1384                 new MockRaftActorContext.MockPayload(data));
1385     }
1386
1387     private ByteString createSnapshot() {
1388         return toByteString(Map.of("1", "A", "2", "B", "3", "C"));
1389     }
1390
1391     @Override
1392     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1393             final ActorRef actorRef, final RaftRPC rpc) {
1394         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1395
1396         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1397         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1398     }
1399
1400     @Override
1401     protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1402         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1403         assertEquals("isSuccess", true, reply.isSuccess());
1404     }
1405 }