1c7b649d8b5c376f2fd4cf396705c9db00f4e484
[controller.git] /
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertArrayEquals;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.verify;
22
23 import com.google.common.base.Stopwatch;
24 import com.google.common.io.ByteSource;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.io.OutputStream;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Optional;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.apache.pekko.actor.ActorRef;
33 import org.apache.pekko.dispatch.Dispatchers;
34 import org.apache.pekko.protobuf.ByteString;
35 import org.apache.pekko.testkit.TestActorRef;
36 import org.apache.pekko.testkit.javadsl.TestKit;
37 import org.junit.After;
38 import org.junit.Test;
39 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
40 import org.opendaylight.controller.cluster.raft.MockRaftActor;
41 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
42 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
43 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
44 import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
45 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
46 import org.opendaylight.controller.cluster.raft.RaftActorContext;
47 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
48 import org.opendaylight.controller.cluster.raft.RaftVersions;
49 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
50 import org.opendaylight.controller.cluster.raft.VotingState;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
52 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
53 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
54 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
55 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
63 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
65 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
67 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
69 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
72 import org.opendaylight.controller.cluster.raft.spi.TermInfo;
73 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
74 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
75 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
76 import scala.concurrent.duration.FiniteDuration;
77
78 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
79     private final short payloadVersion = 5;
80
81     private final ActorRef followerActor = actorFactory.createActor(
82             MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
83     private final ActorRef leaderActor = actorFactory.createActor(
84             MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
85
86     private Follower follower;
87
88     @Override
89     @After
90     public void tearDown() {
91         if (follower != null) {
92             follower.close();
93         }
94
95         super.tearDown();
96     }
97
98     @Override
99     protected Follower createBehavior(final RaftActorContext actorContext) {
100         return spy(new Follower(actorContext));
101     }
102
103     @Override
104     protected  MockRaftActorContext createActorContext() {
105         return createActorContext(followerActor);
106     }
107
108     @Override
109     protected  MockRaftActorContext createActorContext(final ActorRef actorRef) {
110         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
111         context.setPayloadVersion(payloadVersion);
112         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
113             peerId -> leaderActor.path().toString());
114         return context;
115     }
116
117     @Test
118     public void testThatAnElectionTimeoutIsTriggered() {
119         MockRaftActorContext actorContext = createActorContext();
120         follower = new Follower(actorContext);
121
122         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
123                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
124     }
125
126     @Test
127     public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
128         logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
129
130         MockRaftActorContext context = createActorContext();
131         follower = new Follower(context);
132
133         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
134                 TimeUnit.MILLISECONDS);
135         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
136
137         assertTrue(raftBehavior instanceof Candidate);
138     }
139
140     @Test
141     public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
142         logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
143
144         MockRaftActorContext context = createActorContext();
145         ((DefaultConfigParamsImpl) context.getConfigParams())
146                 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
147         ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
148
149         follower = new Follower(context);
150         context.setCurrentBehavior(follower);
151
152         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
153                 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
154         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
155                 -1, -1, (short) 1));
156
157         Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
158         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
159         assertTrue(raftBehavior instanceof Follower);
160
161         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
162                 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
163         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
164                 -1, -1, (short) 1));
165
166         Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
167         raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
168         assertTrue(raftBehavior instanceof Follower);
169     }
170
171     @Test
172     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
173         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
174
175         MockRaftActorContext context = createActorContext();
176         long term = 1000;
177         context.setTermInfo(new TermInfo(term, null));
178
179         follower = createBehavior(context);
180
181         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
182
183         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
184
185         assertEquals("isVoteGranted", true, reply.isVoteGranted());
186         assertEquals("getTerm", term, reply.getTerm());
187         verify(follower).scheduleElection(any(FiniteDuration.class));
188     }
189
190     @Test
191     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
192         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
193
194         MockRaftActorContext context = createActorContext();
195         long term = 1000;
196         context.setTermInfo(new TermInfo(term, "test"));
197
198         follower = createBehavior(context);
199
200         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
201
202         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
203
204         assertEquals("isVoteGranted", false, reply.isVoteGranted());
205         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
206     }
207
208
209     @Test
210     public void testHandleFirstAppendEntries() {
211         logStart("testHandleFirstAppendEntries");
212
213         MockRaftActorContext context = createActorContext();
214         context.getReplicatedLog().clear(0,2);
215         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
216         context.getReplicatedLog().setSnapshotIndex(99);
217
218         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
219
220         assertEquals(1, context.getReplicatedLog().size());
221
222         // The new commitIndex is 101
223         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
224
225         follower = createBehavior(context);
226         follower.handleMessage(leaderActor, appendEntries);
227
228         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
229                 FollowerInitialSyncUpStatus.class);
230         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
231
232         assertFalse(syncStatus.isInitialSyncDone());
233         assertTrue("append entries reply should be true", reply.isSuccess());
234     }
235
236     @Test
237     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
238         logStart("testHandleFirstAppendEntries");
239
240         MockRaftActorContext context = createActorContext();
241
242         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
243
244         // The new commitIndex is 101
245         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
246
247         follower = createBehavior(context);
248         follower.handleMessage(leaderActor, appendEntries);
249
250         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
251                 FollowerInitialSyncUpStatus.class);
252         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
253
254         assertFalse(syncStatus.isInitialSyncDone());
255         assertFalse("append entries reply should be false", reply.isSuccess());
256     }
257
258     @Test
259     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
260         logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
261
262         MockRaftActorContext context = createActorContext();
263         context.getReplicatedLog().clear(0,2);
264         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
265         context.getReplicatedLog().setSnapshotIndex(99);
266
267         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
268
269         // The new commitIndex is 101
270         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
271
272         follower = createBehavior(context);
273         follower.handleMessage(leaderActor, appendEntries);
274
275         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
276                 FollowerInitialSyncUpStatus.class);
277         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
278
279         assertFalse(syncStatus.isInitialSyncDone());
280         assertTrue("append entries reply should be true", reply.isSuccess());
281     }
282
283     @Test
284     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
285         logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
286
287         MockRaftActorContext context = createActorContext();
288         context.getReplicatedLog().clear(0,2);
289         context.getReplicatedLog().setSnapshotIndex(100);
290
291         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
292
293         // The new commitIndex is 101
294         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
295
296         follower = createBehavior(context);
297         follower.handleMessage(leaderActor, appendEntries);
298
299         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
300                 FollowerInitialSyncUpStatus.class);
301         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
302
303         assertFalse(syncStatus.isInitialSyncDone());
304         assertTrue("append entries reply should be true", reply.isSuccess());
305     }
306
307     @Test
308     public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
309         logStart(
310                "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
311
312         MockRaftActorContext context = createActorContext();
313         context.getReplicatedLog().clear(0,2);
314         context.getReplicatedLog().setSnapshotIndex(100);
315
316         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 105, "foo"));
317
318         // The new commitIndex is 101
319         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
320
321         follower = createBehavior(context);
322         follower.handleMessage(leaderActor, appendEntries);
323
324         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
325                 FollowerInitialSyncUpStatus.class);
326         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
327
328         assertFalse(syncStatus.isInitialSyncDone());
329         assertFalse("append entries reply should be false", reply.isSuccess());
330     }
331
332     @Test
333     public void testHandleSyncUpAppendEntries() {
334         logStart("testHandleSyncUpAppendEntries");
335
336         MockRaftActorContext context = createActorContext();
337
338         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
339
340         // The new commitIndex is 101
341         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
342
343         follower = createBehavior(context);
344         follower.handleMessage(leaderActor, appendEntries);
345
346         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
347                 FollowerInitialSyncUpStatus.class);
348
349         assertFalse(syncStatus.isInitialSyncDone());
350
351         // Clear all the messages
352         MessageCollectorActor.clearMessages(followerActor);
353
354         context.setLastApplied(101);
355         context.setCommitIndex(101);
356         setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
357
358         entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
359
360         // The new commitIndex is 101
361         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
362         follower.handleMessage(leaderActor, appendEntries);
363
364         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
365
366         assertTrue(syncStatus.isInitialSyncDone());
367
368         MessageCollectorActor.clearMessages(followerActor);
369
370         // Sending the same message again should not generate another message
371
372         follower.handleMessage(leaderActor, appendEntries);
373
374         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
375
376         assertNull(syncStatus);
377     }
378
379     @Test
380     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
381         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
382
383         MockRaftActorContext context = createActorContext();
384
385         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
386
387         // The new commitIndex is 101
388         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
389
390         follower = createBehavior(context);
391         follower.handleMessage(leaderActor, appendEntries);
392
393         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
394                 FollowerInitialSyncUpStatus.class);
395
396         assertFalse(syncStatus.isInitialSyncDone());
397
398         // Clear all the messages
399         MessageCollectorActor.clearMessages(followerActor);
400
401         context.setLastApplied(100);
402         setLastLogEntry(context, 1, 100,
403                 new MockRaftActorContext.MockPayload(""));
404
405         entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
406
407         // leader-2 is becoming the leader now and it says the commitIndex is 45
408         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
409         follower.handleMessage(leaderActor, appendEntries);
410
411         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
412
413         // We get a new message saying initial status is not done
414         assertFalse(syncStatus.isInitialSyncDone());
415     }
416
417     @Test
418     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
419         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
420
421         MockRaftActorContext context = createActorContext();
422
423         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
424
425         // The new commitIndex is 101
426         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
427
428         follower = createBehavior(context);
429         follower.handleMessage(leaderActor, appendEntries);
430
431         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
432                 FollowerInitialSyncUpStatus.class);
433
434         assertFalse(syncStatus.isInitialSyncDone());
435
436         // Clear all the messages
437         MessageCollectorActor.clearMessages(followerActor);
438
439         context.setLastApplied(101);
440         context.setCommitIndex(101);
441         setLastLogEntry(context, 1, 101,
442                 new MockRaftActorContext.MockPayload(""));
443
444         entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
445
446         // The new commitIndex is 101
447         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
448         follower.handleMessage(leaderActor, appendEntries);
449
450         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
451
452         assertTrue(syncStatus.isInitialSyncDone());
453
454         // Clear all the messages
455         MessageCollectorActor.clearMessages(followerActor);
456
457         context.setLastApplied(100);
458         setLastLogEntry(context, 1, 100,
459                 new MockRaftActorContext.MockPayload(""));
460
461         entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
462
463         // leader-2 is becoming the leader now and it says the commitIndex is 45
464         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
465         follower.handleMessage(leaderActor, appendEntries);
466
467         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
468
469         // We get a new message saying initial status is not done
470         assertFalse(syncStatus.isInitialSyncDone());
471     }
472
473     /**
474      * This test verifies that when an AppendEntries RPC is received by a RaftActor
475      * with a commitIndex that is greater than what has been applied to the
476      * state machine of the RaftActor, the RaftActor applies the state and
477      * sets it current applied state to the commitIndex of the sender.
478      */
479     @Test
480     public void testHandleAppendEntriesWithNewerCommitIndex() {
481         logStart("testHandleAppendEntriesWithNewerCommitIndex");
482
483         MockRaftActorContext context = createActorContext();
484
485         context.setLastApplied(100);
486         setLastLogEntry(context, 1, 100,
487                 new MockRaftActorContext.MockPayload(""));
488         context.getReplicatedLog().setSnapshotIndex(99);
489
490         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
491
492         // The new commitIndex is 101
493         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
494
495         follower = createBehavior(context);
496         follower.handleMessage(leaderActor, appendEntries);
497
498         assertEquals("getLastApplied", 101L, context.getLastApplied());
499     }
500
501     /**
502      * This test verifies that when an AppendEntries is received with a prevLogTerm
503      * which does not match the term that is in RaftActors log entry at prevLogIndex
504      * then the RaftActor does not change it's state and it returns a failure.
505      */
506     @Test
507     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
508         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
509
510         MockRaftActorContext context = createActorContext();
511
512         AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, List.of(), 101, -1, (short)0);
513
514         follower = createBehavior(context);
515
516         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
517
518         assertSame(follower, newBehavior);
519
520         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
521                 AppendEntriesReply.class);
522
523         assertEquals("isSuccess", false, reply.isSuccess());
524     }
525
526     @Test
527     public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
528         logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
529
530         MockRaftActorContext context = createActorContext();
531         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
532         context.getReplicatedLog().setSnapshotIndex(4);
533         context.getReplicatedLog().setSnapshotTerm(3);
534
535         AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, List.of(), 8, -1, (short)0);
536
537         follower = createBehavior(context);
538
539         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
540
541         assertSame(follower, newBehavior);
542
543         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
544
545         assertEquals("isSuccess", true, reply.isSuccess());
546     }
547
548     /**
549      * This test verifies that when a new AppendEntries message is received with
550      * new entries and the logs of the sender and receiver match that the new
551      * entries get added to the log and the log is incremented by the number of
552      * entries received in appendEntries.
553      */
554     @Test
555     public void testHandleAppendEntriesAddNewEntries() {
556         logStart("testHandleAppendEntriesAddNewEntries");
557
558         MockRaftActorContext context = createActorContext();
559
560         // First set the receivers term to lower number
561         context.setTermInfo(new TermInfo(1, "test"));
562
563         // Prepare the receivers log
564         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
565         log.append(newReplicatedLogEntry(1, 0, "zero"));
566         log.append(newReplicatedLogEntry(1, 1, "one"));
567         log.append(newReplicatedLogEntry(1, 2, "two"));
568
569         context.setReplicatedLog(log);
570
571         // Prepare the entries to be sent with AppendEntries
572         List<ReplicatedLogEntry> entries = List.of(
573             newReplicatedLogEntry(1, 3, "three"), newReplicatedLogEntry(1, 4, "four"));
574
575         // Send appendEntries with the same term as was set on the receiver
576         // before the new behavior was created (1 in this case)
577         // This will not work for a Candidate because as soon as a Candidate
578         // is created it increments the term
579         short leaderPayloadVersion = 10;
580         String leaderId = "leader-1";
581         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
582
583         follower = createBehavior(context);
584
585         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
586
587         assertSame(follower, newBehavior);
588
589         assertEquals("Next index", 5, log.last().index() + 1);
590         assertEquals("Entry 3", entries.get(0), log.get(3));
591         assertEquals("Entry 4", entries.get(1), log.get(4));
592
593         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
594         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
595
596         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
597     }
598
599     /**
600      * This test verifies that when a new AppendEntries message is received with
601      * new entries and the logs of the sender and receiver are out-of-sync that
602      * the log is first corrected by removing the out of sync entries from the
603      * log and then adding in the new entries sent with the AppendEntries message.
604      */
605     @Test
606     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
607         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
608
609         MockRaftActorContext context = createActorContext();
610
611         // First set the receivers term to lower number
612         context.setTermInfo(new TermInfo(1, "test"));
613
614         // Prepare the receivers log
615         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
616         log.append(newReplicatedLogEntry(1, 0, "zero"));
617         log.append(newReplicatedLogEntry(1, 1, "one"));
618         log.append(newReplicatedLogEntry(1, 2, "two"));
619
620         context.setReplicatedLog(log);
621
622         // Prepare the entries to be sent with AppendEntries
623         List<ReplicatedLogEntry> entries = List.of(
624             newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
625
626         // Send appendEntries with the same term as was set on the receiver
627         // before the new behavior was created (1 in this case)
628         // This will not work for a Candidate because as soon as a Candidate
629         // is created it increments the term
630         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
631
632         follower = createBehavior(context);
633
634         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
635
636         assertSame(follower, newBehavior);
637
638         // The entry at index 2 will be found out-of-sync with the leader
639         // and will be removed
640         // Then the two new entries will be added to the log
641         // Thus making the log to have 4 entries
642         assertEquals("Next index", 4, log.last().index() + 1);
643         //assertEquals("Entry 2", entries.get(0), log.get(2));
644
645         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
646
647         // Check that the entry at index 2 has the new data
648         assertEquals("Entry 2", entries.get(0), log.get(2));
649
650         assertEquals("Entry 3", entries.get(1), log.get(3));
651
652         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
653     }
654
655     @Test
656     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
657         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
658
659         MockRaftActorContext context = createActorContext();
660
661         // First set the receivers term to lower number
662         context.setTermInfo(new TermInfo(1, "test"));
663
664         // Prepare the receivers log
665         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
666         log.append(newReplicatedLogEntry(1, 0, "zero"));
667         log.append(newReplicatedLogEntry(1, 1, "one"));
668         log.append(newReplicatedLogEntry(1, 2, "two"));
669
670         context.setReplicatedLog(log);
671
672         // Prepare the entries to be sent with AppendEntries
673         List<ReplicatedLogEntry> entries = List.of(
674             newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
675
676         // Send appendEntries with the same term as was set on the receiver
677         // before the new behavior was created (1 in this case)
678         // This will not work for a Candidate because as soon as a Candidate
679         // is created it increments the term
680         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
681
682         context.setRaftPolicy(createRaftPolicy(false, true));
683         follower = createBehavior(context);
684
685         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
686
687         assertSame(follower, newBehavior);
688
689         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
690     }
691
692     @Test
693     public void testHandleAppendEntriesPreviousLogEntryMissing() {
694         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
695
696         final MockRaftActorContext context = createActorContext();
697
698         // Prepare the receivers log
699         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
700         log.append(newReplicatedLogEntry(1, 0, "zero"));
701         log.append(newReplicatedLogEntry(1, 1, "one"));
702         log.append(newReplicatedLogEntry(1, 2, "two"));
703
704         context.setReplicatedLog(log);
705
706         // Prepare the entries to be sent with AppendEntries
707         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
708
709         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
710
711         follower = createBehavior(context);
712
713         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
714
715         assertSame(follower, newBehavior);
716
717         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
718     }
719
720     @Test
721     public void testHandleAppendEntriesWithExistingLogEntry() {
722         logStart("testHandleAppendEntriesWithExistingLogEntry");
723
724         MockRaftActorContext context = createActorContext();
725
726         context.setTermInfo(new TermInfo(1, "test"));
727
728         // Prepare the receivers log
729         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
730         log.append(newReplicatedLogEntry(1, 0, "zero"));
731         log.append(newReplicatedLogEntry(1, 1, "one"));
732
733         context.setReplicatedLog(log);
734
735         // Send the last entry again.
736         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 1, "one"));
737
738         follower = createBehavior(context);
739
740         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
741
742         assertEquals("Next index", 2, log.last().index() + 1);
743         assertEquals("Entry 1", entries.get(0), log.get(1));
744
745         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
746
747         // Send the last entry again and also a new one.
748
749         entries = List.of(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
750
751         MessageCollectorActor.clearMessages(leaderActor);
752         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
753
754         assertEquals("Next index", 3, log.last().index() + 1);
755         assertEquals("Entry 1", entries.get(0), log.get(1));
756         assertEquals("Entry 2", entries.get(1), log.get(2));
757
758         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
759     }
760
761     @Test
762     public void testHandleAppendEntriesAfterInstallingSnapshot() {
763         logStart("testHandleAppendAfterInstallingSnapshot");
764
765         MockRaftActorContext context = createActorContext();
766
767         // Prepare the receivers log
768         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
769
770         // Set up a log as if it has been snapshotted
771         log.setSnapshotIndex(3);
772         log.setSnapshotTerm(1);
773
774         context.setReplicatedLog(log);
775
776         // Prepare the entries to be sent with AppendEntries
777         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
778
779         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
780
781         follower = createBehavior(context);
782
783         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
784
785         assertSame(follower, newBehavior);
786
787         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
788     }
789
790     /**
791      * This test verifies that when InstallSnapshot is received by
792      * the follower its applied correctly.
793      */
794     @Test
795     public void testHandleInstallSnapshot() {
796         logStart("testHandleInstallSnapshot");
797
798         MockRaftActorContext context = createActorContext();
799         context.setTermInfo(new TermInfo(1, "leader"));
800
801         follower = createBehavior(context);
802
803         ByteString bsSnapshot = createSnapshot();
804         int offset = 0;
805         int snapshotLength = bsSnapshot.size();
806         int chunkSize = 50;
807         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
808         int lastIncludedIndex = 1;
809         int chunkIndex = 1;
810         InstallSnapshot lastInstallSnapshot = null;
811
812         for (int i = 0; i < totalChunks; i++) {
813             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
814             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
815                     chunkData, chunkIndex, totalChunks);
816             follower.handleMessage(leaderActor, lastInstallSnapshot);
817             offset = offset + 50;
818             lastIncludedIndex++;
819             chunkIndex++;
820         }
821
822         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
823                 ApplySnapshot.class);
824         Snapshot snapshot = applySnapshot.getSnapshot();
825         assertNotNull(lastInstallSnapshot);
826         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
827         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
828                 snapshot.getLastAppliedTerm());
829         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
830                 snapshot.getLastAppliedIndex());
831         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
832         assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
833         assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
834         assertEquals(new TermInfo(1, "leader"), snapshot.termInfo());
835         applySnapshot.getCallback().onSuccess();
836
837         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
838                 leaderActor, InstallSnapshotReply.class);
839         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
840
841         chunkIndex = 1;
842         for (InstallSnapshotReply reply: replies) {
843             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
844             assertEquals("getTerm", 1, reply.getTerm());
845             assertEquals("isSuccess", true, reply.isSuccess());
846             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
847         }
848
849         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
850     }
851
852     /**
853      * Verify that when an AppendEntries is sent to a follower during a snapshot install
854      * the Follower short-circuits the processing of the AppendEntries message.
855      */
856     @Test
857     public void testReceivingAppendEntriesDuringInstallSnapshot() {
858         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
859
860         MockRaftActorContext context = createActorContext();
861
862         follower = createBehavior(context);
863
864         ByteString bsSnapshot  = createSnapshot();
865         int snapshotLength = bsSnapshot.size();
866         int chunkSize = 50;
867         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
868         int lastIncludedIndex = 1;
869
870         // Check that snapshot installation is not in progress
871         assertNull(follower.getSnapshotTracker());
872
873         // Make sure that we have more than 1 chunk to send
874         assertTrue(totalChunks > 1);
875
876         // Send an install snapshot with the first chunk to start the process of installing a snapshot
877         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
878         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
879                 chunkData, 1, totalChunks));
880
881         // Check if snapshot installation is in progress now
882         assertNotNull(follower.getSnapshotTracker());
883
884         // Send an append entry
885         AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
886                 List.of(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
887
888         follower.handleMessage(leaderActor, appendEntries);
889
890         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
891         assertEquals("isSuccess", true, reply.isSuccess());
892         assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
893         assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
894         assertEquals("getTerm", context.currentTerm(), reply.getTerm());
895
896         assertNotNull(follower.getSnapshotTracker());
897     }
898
899     @Test
900     public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
901         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
902
903         MockRaftActorContext context = createActorContext();
904
905         follower = createBehavior(context);
906
907         ByteString bsSnapshot  = createSnapshot();
908         int snapshotLength = bsSnapshot.size();
909         int chunkSize = 50;
910         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
911         int lastIncludedIndex = 1;
912
913         // Check that snapshot installation is not in progress
914         assertNull(follower.getSnapshotTracker());
915
916         // Make sure that we have more than 1 chunk to send
917         assertTrue(totalChunks > 1);
918
919         // Send an install snapshot with the first chunk to start the process of installing a snapshot
920         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
921         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
922                 chunkData, 1, totalChunks));
923
924         // Check if snapshot installation is in progress now
925         assertNotNull(follower.getSnapshotTracker());
926
927         // Send appendEntries with a new term and leader.
928         AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
929                 List.of(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
930
931         follower.handleMessage(leaderActor, appendEntries);
932
933         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
934         assertEquals("isSuccess", true, reply.isSuccess());
935         assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
936         assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
937         assertEquals("getTerm", 2, reply.getTerm());
938
939         assertNull(follower.getSnapshotTracker());
940     }
941
942     @Test
943     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
944         logStart("testInitialSyncUpWithHandleInstallSnapshot");
945
946         MockRaftActorContext context = createActorContext();
947         context.setCommitIndex(-1);
948
949         follower = createBehavior(context);
950
951         ByteString bsSnapshot  = createSnapshot();
952         int offset = 0;
953         int snapshotLength = bsSnapshot.size();
954         int chunkSize = 50;
955         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
956         int lastIncludedIndex = 1;
957         int chunkIndex = 1;
958         InstallSnapshot lastInstallSnapshot = null;
959
960         for (int i = 0; i < totalChunks; i++) {
961             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
962             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
963                     chunkData, chunkIndex, totalChunks);
964             follower.handleMessage(leaderActor, lastInstallSnapshot);
965             offset = offset + 50;
966             lastIncludedIndex++;
967             chunkIndex++;
968         }
969
970         FollowerInitialSyncUpStatus syncStatus =
971                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
972
973         assertFalse(syncStatus.isInitialSyncDone());
974
975         // Clear all the messages
976         MessageCollectorActor.clearMessages(followerActor);
977
978         context.setLastApplied(101);
979         context.setCommitIndex(101);
980         setLastLogEntry(context, 1, 101,
981                 new MockRaftActorContext.MockPayload(""));
982
983         List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
984
985         // The new commitIndex is 101
986         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
987         follower.handleMessage(leaderActor, appendEntries);
988
989         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
990
991         assertTrue(syncStatus.isInitialSyncDone());
992     }
993
994     @Test
995     public void testHandleOutOfSequenceInstallSnapshot() {
996         logStart("testHandleOutOfSequenceInstallSnapshot");
997
998         MockRaftActorContext context = createActorContext();
999
1000         follower = createBehavior(context);
1001
1002         ByteString bsSnapshot = createSnapshot();
1003
1004         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1005                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1006         follower.handleMessage(leaderActor, installSnapshot);
1007
1008         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1009                 InstallSnapshotReply.class);
1010
1011         assertEquals("isSuccess", false, reply.isSuccess());
1012         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1013         assertEquals("getTerm", 1, reply.getTerm());
1014         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1015
1016         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1017     }
1018
1019     @Test
1020     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1021         MockRaftActorContext context = createActorContext();
1022
1023         Stopwatch stopwatch = Stopwatch.createStarted();
1024
1025         follower = createBehavior(context);
1026
1027         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1028
1029         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1030
1031         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1032
1033         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1034         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1035     }
1036
1037     @Test
1038     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1039         MockRaftActorContext context = createActorContext();
1040         context.setConfigParams(new DefaultConfigParamsImpl() {
1041             @Override
1042             public FiniteDuration getElectionTimeOutInterval() {
1043                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1044             }
1045         });
1046
1047         context.setRaftPolicy(createRaftPolicy(false, false));
1048
1049         follower = createBehavior(context);
1050
1051         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1052         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1053         assertSame("handleMessage result", follower, newBehavior);
1054     }
1055
1056     @Test
1057     public void testFollowerSchedulesElectionIfNonVoting() {
1058         MockRaftActorContext context = createActorContext();
1059         context.updatePeerIds(new ServerConfigurationPayload(List.of(new ServerInfo(context.getId(), false))));
1060         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1061                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1062         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1063
1064         follower = new Follower(context, "leader", (short)1);
1065
1066         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1067                 ElectionTimeout.class);
1068         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1069         assertSame("handleMessage result", follower, newBehavior);
1070         assertNull("Expected null leaderId", follower.getLeaderId());
1071     }
1072
1073     @Test
1074     // TODO: parameterized with all possible RaftRPCs
1075     public void testElectionScheduledWhenAnyRaftRPCReceived() {
1076         MockRaftActorContext context = createActorContext();
1077         follower = createBehavior(context);
1078         follower.handleMessage(leaderActor, new RequestVoteReply(100, false));
1079         verify(follower).scheduleElection(any(FiniteDuration.class));
1080     }
1081
1082     @Test
1083     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1084         MockRaftActorContext context = createActorContext();
1085         follower = createBehavior(context);
1086         follower.handleMessage(leaderActor, "non-raft-rpc");
1087         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1088     }
1089
1090     @Test
1091     public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1092         String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1093         logStart(id);
1094
1095         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1096
1097         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1098         config.setSnapshotBatchCount(2);
1099         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1100
1101         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1102         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1103         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1104                 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1105         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1106                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1107         followerRaftActor.set(followerActorRef.underlyingActor());
1108         followerRaftActor.get().waitForInitializeBehaviorComplete();
1109
1110         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1111         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1112         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1113
1114         List<ReplicatedLogEntry> entries = List.of(
1115                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1116
1117         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1118
1119         followerActorRef.tell(appendEntries, leaderActor);
1120
1121         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1122         assertEquals("isSuccess", true, reply.isSuccess());
1123
1124         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1125
1126         InMemoryJournal.waitForDeleteMessagesComplete(id);
1127         InMemoryJournal.waitForWriteMessagesComplete(id);
1128         // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1129         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1130         // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1131         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1132         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1133         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1134         assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1135
1136         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1137         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1138         assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1139         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1140         assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1141         assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData()),
1142                 MockRaftActor.fromState(snapshot.getState()));
1143     }
1144
1145     @Test
1146     public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1147         String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1148         logStart(id);
1149
1150         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1151
1152         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1153         config.setSnapshotBatchCount(2);
1154         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1155
1156         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1157         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1158         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1159                 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1160         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1161                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1162         followerRaftActor.set(followerActorRef.underlyingActor());
1163         followerRaftActor.get().waitForInitializeBehaviorComplete();
1164
1165         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1166         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1167         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1168
1169         List<ReplicatedLogEntry> entries = List.of(
1170                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1171                 newReplicatedLogEntry(1, 2, "three"));
1172
1173         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1174
1175         followerActorRef.tell(appendEntries, leaderActor);
1176
1177         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1178         assertEquals("isSuccess", true, reply.isSuccess());
1179
1180         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1181
1182         InMemoryJournal.waitForDeleteMessagesComplete(id);
1183         InMemoryJournal.waitForWriteMessagesComplete(id);
1184         // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1185         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1186         // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1187         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1188         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1189         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1190         assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1191
1192         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1193         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1194         assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1195         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1196         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1197         assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData(),
1198                 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1199
1200         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1201         assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1202
1203         // Reinstate the actor from persistence
1204
1205         actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
1206
1207         followerActorRef = actorFactory.createTestActor(builder.props()
1208                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1209         followerRaftActor.set(followerActorRef.underlyingActor());
1210         followerRaftActor.get().waitForInitializeBehaviorComplete();
1211
1212         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1213         assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1214         assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1215         assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1216         assertEquals("State", List.of(entries.get(0).getData(), entries.get(1).getData(),
1217                 entries.get(2).getData()), followerRaftActor.get().getState());
1218     }
1219
1220     @Test
1221     public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1222         String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1223         logStart(id);
1224
1225         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1226
1227         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1228         config.setSnapshotBatchCount(1);
1229         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1230
1231         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1232         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1233         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1234                 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1235         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1236                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1237         followerRaftActor.set(followerActorRef.underlyingActor());
1238         followerRaftActor.get().waitForInitializeBehaviorComplete();
1239
1240         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1241         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1242         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1243
1244         List<ReplicatedLogEntry> entries = List.of(
1245                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1246                 newReplicatedLogEntry(1, 2, "three"));
1247
1248         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1249
1250         followerActorRef.tell(appendEntries, leaderActor);
1251
1252         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1253         assertEquals("isSuccess", true, reply.isSuccess());
1254
1255         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1256
1257         InMemoryJournal.waitForDeleteMessagesComplete(id);
1258         InMemoryJournal.waitForWriteMessagesComplete(id);
1259         // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1260         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1261         // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1262         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1263         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1264         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1265         assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1266
1267         assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1268         assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).index());
1269         assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).index());
1270         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1271         assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1272         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1273         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1274         assertEquals("Snapshot state", List.of(entries.get(0).getData()),
1275                 MockRaftActor.fromState(snapshot.getState()));
1276     }
1277
1278     @Test
1279     public void testNeedsLeaderAddress() {
1280         logStart("testNeedsLeaderAddress");
1281
1282         MockRaftActorContext context = createActorContext();
1283         context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
1284         context.addToPeers("leader", null, VotingState.VOTING);
1285         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
1286
1287         follower = createBehavior(context);
1288
1289         follower.handleMessage(leaderActor,
1290                 new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1, (short)0));
1291
1292         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1293         assertTrue(reply.isNeedsLeaderAddress());
1294         MessageCollectorActor.clearMessages(leaderActor);
1295
1296         PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
1297         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
1298
1299         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1,
1300                 (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
1301
1302         reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1303         assertFalse(reply.isNeedsLeaderAddress());
1304
1305         verify(mockResolver).setResolved("leader", leaderActor.path().toString());
1306     }
1307
1308     @SuppressWarnings("checkstyle:IllegalCatch")
1309     private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1310             final AtomicReference<MockRaftActor> followerRaftActor) {
1311         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1312             @Override
1313             public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
1314                 try {
1315                     actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1316                             installSnapshotStream), actorRef);
1317                 } catch (RuntimeException e) {
1318                     throw e;
1319                 } catch (Exception e) {
1320                     throw new RuntimeException(e);
1321                 }
1322             }
1323
1324             @Override
1325             public void applySnapshot(final State snapshotState) {
1326             }
1327
1328             @Override
1329             public State deserializeSnapshot(final ByteSource snapshotBytes) {
1330                 throw new UnsupportedOperationException();
1331             }
1332         };
1333         return snapshotCohort;
1334     }
1335
1336     public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1337         int snapshotLength = bs.size();
1338         int start = offset;
1339         int size = chunkSize;
1340         if (chunkSize > snapshotLength) {
1341             size = snapshotLength;
1342         } else if (start + chunkSize > snapshotLength) {
1343             size = snapshotLength - start;
1344         }
1345
1346         byte[] nextChunk = new byte[size];
1347         bs.copyTo(nextChunk, start, 0, size);
1348         return nextChunk;
1349     }
1350
1351     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1352             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1353         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1354     }
1355
1356     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1357             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1358             final boolean expForceInstallSnapshot) {
1359
1360         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1361                 AppendEntriesReply.class);
1362
1363         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1364         assertEquals("getTerm", expTerm, reply.getTerm());
1365         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1366         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1367         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1368         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1369         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1370         assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
1371     }
1372
1373
1374     private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1375         return new SimpleReplicatedLogEntry(index, term,
1376                 new MockRaftActorContext.MockPayload(data));
1377     }
1378
1379     private ByteString createSnapshot() {
1380         return toByteString(Map.of("1", "A", "2", "B", "3", "C"));
1381     }
1382
1383     @Override
1384     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1385             final ActorRef actorRef, final RaftRPC rpc) {
1386         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1387
1388         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1389         assertEquals("New votedFor", expVotedFor, actorContext.termInfo().votedFor());
1390     }
1391
1392     @Override
1393     protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1394         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1395         assertEquals("isSuccess", true, reply.isSuccess());
1396     }
1397 }