3e6c7590c0b80cea60bb50148e015bc35c1d3207
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.Props;
24 import akka.dispatch.Dispatchers;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.base.Optional;
28 import com.google.common.base.Stopwatch;
29 import com.google.common.collect.ImmutableList;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.io.ByteSource;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import com.google.protobuf.ByteString;
34 import java.io.OutputStream;
35 import java.util.ArrayList;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.List;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Assert;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
46 import org.opendaylight.controller.cluster.raft.MockRaftActor;
47 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
48 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
52 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
55 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
56 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
57 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
59 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
60 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
61 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
62 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
63 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
65 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
66 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
71 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
72 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
73 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
74 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
75 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import scala.concurrent.duration.FiniteDuration;
78
79 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
80
81     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
82             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
83
84     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
85             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
86
87     private Follower follower;
88
89     private final short payloadVersion = 5;
90
91     @Override
92     @After
93     public void tearDown() {
94         if (follower != null) {
95             follower.close();
96         }
97
98         super.tearDown();
99     }
100
101     @Override
102     protected Follower createBehavior(final RaftActorContext actorContext) {
103         return spy(new Follower(actorContext));
104     }
105
106     @Override
107     protected  MockRaftActorContext createActorContext() {
108         return createActorContext(followerActor);
109     }
110
111     @Override
112     protected  MockRaftActorContext createActorContext(final ActorRef actorRef) {
113         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
114         context.setPayloadVersion(payloadVersion);
115         return context;
116     }
117
118     @Test
119     public void testThatAnElectionTimeoutIsTriggered() {
120         MockRaftActorContext actorContext = createActorContext();
121         follower = new Follower(actorContext);
122
123         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
124                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
125     }
126
127     @Test
128     public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
129         logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
130
131         MockRaftActorContext context = createActorContext();
132         follower = new Follower(context);
133
134         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
135                 TimeUnit.MILLISECONDS);
136         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
137
138         assertTrue(raftBehavior instanceof Candidate);
139     }
140
141     @Test
142     public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
143         logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
144
145         MockRaftActorContext context = createActorContext();
146         ((DefaultConfigParamsImpl) context.getConfigParams())
147                 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
148         ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
149
150         follower = new Follower(context);
151         context.setCurrentBehavior(follower);
152
153         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
154                 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
155         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
156                 -1, -1, (short) 1));
157
158         Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
159         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
160         assertTrue(raftBehavior instanceof Follower);
161
162         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
163                 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
164         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
165                 -1, -1, (short) 1));
166
167         Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
168         raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
169         assertTrue(raftBehavior instanceof Follower);
170     }
171
172     @Test
173     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
174         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
175
176         MockRaftActorContext context = createActorContext();
177         long term = 1000;
178         context.getTermInformation().update(term, null);
179
180         follower = createBehavior(context);
181
182         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
183
184         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
185
186         assertEquals("isVoteGranted", true, reply.isVoteGranted());
187         assertEquals("getTerm", term, reply.getTerm());
188         verify(follower).scheduleElection(any(FiniteDuration.class));
189     }
190
191     @Test
192     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
193         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
194
195         MockRaftActorContext context = createActorContext();
196         long term = 1000;
197         context.getTermInformation().update(term, "test");
198
199         follower = createBehavior(context);
200
201         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
202
203         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
204
205         assertEquals("isVoteGranted", false, reply.isVoteGranted());
206         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
207     }
208
209
210     @Test
211     public void testHandleFirstAppendEntries() {
212         logStart("testHandleFirstAppendEntries");
213
214         MockRaftActorContext context = createActorContext();
215         context.getReplicatedLog().clear(0,2);
216         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
217         context.getReplicatedLog().setSnapshotIndex(99);
218
219         List<ReplicatedLogEntry> entries = Arrays.asList(
220                 newReplicatedLogEntry(2, 101, "foo"));
221
222         Assert.assertEquals(1, context.getReplicatedLog().size());
223
224         // The new commitIndex is 101
225         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
226
227         follower = createBehavior(context);
228         follower.handleMessage(leaderActor, appendEntries);
229
230         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
231                 FollowerInitialSyncUpStatus.class);
232         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
233
234         assertFalse(syncStatus.isInitialSyncDone());
235         assertTrue("append entries reply should be true", reply.isSuccess());
236     }
237
238     @Test
239     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
240         logStart("testHandleFirstAppendEntries");
241
242         MockRaftActorContext context = createActorContext();
243
244         List<ReplicatedLogEntry> entries = Arrays.asList(
245                 newReplicatedLogEntry(2, 101, "foo"));
246
247         // The new commitIndex is 101
248         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
249
250         follower = createBehavior(context);
251         follower.handleMessage(leaderActor, appendEntries);
252
253         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
254                 FollowerInitialSyncUpStatus.class);
255         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
256
257         assertFalse(syncStatus.isInitialSyncDone());
258         assertFalse("append entries reply should be false", reply.isSuccess());
259     }
260
261     @Test
262     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
263         logStart("testHandleFirstAppendEntries");
264
265         MockRaftActorContext context = createActorContext();
266         context.getReplicatedLog().clear(0,2);
267         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
268         context.getReplicatedLog().setSnapshotIndex(99);
269
270         List<ReplicatedLogEntry> entries = Arrays.asList(
271                 newReplicatedLogEntry(2, 101, "foo"));
272
273         // The new commitIndex is 101
274         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
275
276         follower = createBehavior(context);
277         follower.handleMessage(leaderActor, appendEntries);
278
279         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
280                 FollowerInitialSyncUpStatus.class);
281         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
282
283         assertFalse(syncStatus.isInitialSyncDone());
284         assertTrue("append entries reply should be true", reply.isSuccess());
285     }
286
287     @Test
288     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
289         logStart("testHandleFirstAppendEntries");
290
291         MockRaftActorContext context = createActorContext();
292         context.getReplicatedLog().clear(0,2);
293         context.getReplicatedLog().setSnapshotIndex(100);
294
295         List<ReplicatedLogEntry> entries = Arrays.asList(
296                 newReplicatedLogEntry(2, 101, "foo"));
297
298         // The new commitIndex is 101
299         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
300
301         follower = createBehavior(context);
302         follower.handleMessage(leaderActor, appendEntries);
303
304         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
305                 FollowerInitialSyncUpStatus.class);
306         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
307
308         assertFalse(syncStatus.isInitialSyncDone());
309         assertTrue("append entries reply should be true", reply.isSuccess());
310     }
311
312     @Test
313     public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
314         logStart(
315                "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
316
317         MockRaftActorContext context = createActorContext();
318         context.getReplicatedLog().clear(0,2);
319         context.getReplicatedLog().setSnapshotIndex(100);
320
321         List<ReplicatedLogEntry> entries = Arrays.asList(
322                 newReplicatedLogEntry(2, 105, "foo"));
323
324         // The new commitIndex is 101
325         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
326
327         follower = createBehavior(context);
328         follower.handleMessage(leaderActor, appendEntries);
329
330         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
331                 FollowerInitialSyncUpStatus.class);
332         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
333
334         assertFalse(syncStatus.isInitialSyncDone());
335         assertFalse("append entries reply should be false", reply.isSuccess());
336     }
337
338     @Test
339     public void testHandleSyncUpAppendEntries() {
340         logStart("testHandleSyncUpAppendEntries");
341
342         MockRaftActorContext context = createActorContext();
343
344         List<ReplicatedLogEntry> entries = Arrays.asList(
345                 newReplicatedLogEntry(2, 101, "foo"));
346
347         // The new commitIndex is 101
348         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
349
350         follower = createBehavior(context);
351         follower.handleMessage(leaderActor, appendEntries);
352
353         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
354                 FollowerInitialSyncUpStatus.class);
355
356         assertFalse(syncStatus.isInitialSyncDone());
357
358         // Clear all the messages
359         followerActor.underlyingActor().clear();
360
361         context.setLastApplied(101);
362         context.setCommitIndex(101);
363         setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
364
365         entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
366
367         // The new commitIndex is 101
368         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
369         follower.handleMessage(leaderActor, appendEntries);
370
371         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
372
373         assertTrue(syncStatus.isInitialSyncDone());
374
375         followerActor.underlyingActor().clear();
376
377         // Sending the same message again should not generate another message
378
379         follower.handleMessage(leaderActor, appendEntries);
380
381         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
382
383         assertNull(syncStatus);
384     }
385
386     @Test
387     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
388         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
389
390         MockRaftActorContext context = createActorContext();
391
392         List<ReplicatedLogEntry> entries = Arrays.asList(
393                 newReplicatedLogEntry(2, 101, "foo"));
394
395         // The new commitIndex is 101
396         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
397
398         follower = createBehavior(context);
399         follower.handleMessage(leaderActor, appendEntries);
400
401         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
402                 FollowerInitialSyncUpStatus.class);
403
404         assertFalse(syncStatus.isInitialSyncDone());
405
406         // Clear all the messages
407         followerActor.underlyingActor().clear();
408
409         context.setLastApplied(100);
410         setLastLogEntry(context, 1, 100,
411                 new MockRaftActorContext.MockPayload(""));
412
413         entries = Arrays.asList(
414                 newReplicatedLogEntry(2, 101, "foo"));
415
416         // leader-2 is becoming the leader now and it says the commitIndex is 45
417         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
418         follower.handleMessage(leaderActor, appendEntries);
419
420         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
421
422         // We get a new message saying initial status is not done
423         assertFalse(syncStatus.isInitialSyncDone());
424     }
425
426     @Test
427     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
428         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
429
430         MockRaftActorContext context = createActorContext();
431
432         List<ReplicatedLogEntry> entries = Arrays.asList(
433                 newReplicatedLogEntry(2, 101, "foo"));
434
435         // The new commitIndex is 101
436         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
437
438         follower = createBehavior(context);
439         follower.handleMessage(leaderActor, appendEntries);
440
441         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
442                 FollowerInitialSyncUpStatus.class);
443
444         assertFalse(syncStatus.isInitialSyncDone());
445
446         // Clear all the messages
447         followerActor.underlyingActor().clear();
448
449         context.setLastApplied(101);
450         context.setCommitIndex(101);
451         setLastLogEntry(context, 1, 101,
452                 new MockRaftActorContext.MockPayload(""));
453
454         entries = Arrays.asList(
455                 newReplicatedLogEntry(2, 101, "foo"));
456
457         // The new commitIndex is 101
458         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
459         follower.handleMessage(leaderActor, appendEntries);
460
461         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
462
463         assertTrue(syncStatus.isInitialSyncDone());
464
465         // Clear all the messages
466         followerActor.underlyingActor().clear();
467
468         context.setLastApplied(100);
469         setLastLogEntry(context, 1, 100,
470                 new MockRaftActorContext.MockPayload(""));
471
472         entries = Arrays.asList(
473                 newReplicatedLogEntry(2, 101, "foo"));
474
475         // leader-2 is becoming the leader now and it says the commitIndex is 45
476         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
477         follower.handleMessage(leaderActor, appendEntries);
478
479         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
480
481         // We get a new message saying initial status is not done
482         assertFalse(syncStatus.isInitialSyncDone());
483     }
484
485     /**
486      * This test verifies that when an AppendEntries RPC is received by a RaftActor
487      * with a commitIndex that is greater than what has been applied to the
488      * state machine of the RaftActor, the RaftActor applies the state and
489      * sets it current applied state to the commitIndex of the sender.
490      */
491     @Test
492     public void testHandleAppendEntriesWithNewerCommitIndex() {
493         logStart("testHandleAppendEntriesWithNewerCommitIndex");
494
495         MockRaftActorContext context = createActorContext();
496
497         context.setLastApplied(100);
498         setLastLogEntry(context, 1, 100,
499                 new MockRaftActorContext.MockPayload(""));
500         context.getReplicatedLog().setSnapshotIndex(99);
501
502         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
503                 newReplicatedLogEntry(2, 101, "foo"));
504
505         // The new commitIndex is 101
506         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
507
508         follower = createBehavior(context);
509         follower.handleMessage(leaderActor, appendEntries);
510
511         assertEquals("getLastApplied", 101L, context.getLastApplied());
512     }
513
514     /**
515      * This test verifies that when an AppendEntries is received a specific prevLogTerm
516      * which does not match the term that is in RaftActors log entry at prevLogIndex
517      * then the RaftActor does not change it's state and it returns a failure.
518      */
519     @Test
520     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
521         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
522
523         MockRaftActorContext context = createActorContext();
524
525         // First set the receivers term to lower number
526         context.getTermInformation().update(95, "test");
527
528         // AppendEntries is now sent with a bigger term
529         // this will set the receivers term to be the same as the sender's term
530         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, Collections.emptyList(), 101, -1,
531                 (short)0);
532
533         follower = createBehavior(context);
534
535         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
536
537         Assert.assertSame(follower, newBehavior);
538
539         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
540                 AppendEntriesReply.class);
541
542         assertEquals("isSuccess", false, reply.isSuccess());
543     }
544
545     /**
546      * This test verifies that when a new AppendEntries message is received with
547      * new entries and the logs of the sender and receiver match that the new
548      * entries get added to the log and the log is incremented by the number of
549      * entries received in appendEntries.
550      */
551     @Test
552     public void testHandleAppendEntriesAddNewEntries() {
553         logStart("testHandleAppendEntriesAddNewEntries");
554
555         MockRaftActorContext context = createActorContext();
556
557         // First set the receivers term to lower number
558         context.getTermInformation().update(1, "test");
559
560         // Prepare the receivers log
561         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
562         log.append(newReplicatedLogEntry(1, 0, "zero"));
563         log.append(newReplicatedLogEntry(1, 1, "one"));
564         log.append(newReplicatedLogEntry(1, 2, "two"));
565
566         context.setReplicatedLog(log);
567
568         // Prepare the entries to be sent with AppendEntries
569         List<ReplicatedLogEntry> entries = new ArrayList<>();
570         entries.add(newReplicatedLogEntry(1, 3, "three"));
571         entries.add(newReplicatedLogEntry(1, 4, "four"));
572
573         // Send appendEntries with the same term as was set on the receiver
574         // before the new behavior was created (1 in this case)
575         // This will not work for a Candidate because as soon as a Candidate
576         // is created it increments the term
577         short leaderPayloadVersion = 10;
578         String leaderId = "leader-1";
579         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
580
581         follower = createBehavior(context);
582
583         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
584
585         Assert.assertSame(follower, newBehavior);
586
587         assertEquals("Next index", 5, log.last().getIndex() + 1);
588         assertEquals("Entry 3", entries.get(0), log.get(3));
589         assertEquals("Entry 4", entries.get(1), log.get(4));
590
591         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
592         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
593
594         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
595     }
596
597     /**
598      * This test verifies that when a new AppendEntries message is received with
599      * new entries and the logs of the sender and receiver are out-of-sync that
600      * the log is first corrected by removing the out of sync entries from the
601      * log and then adding in the new entries sent with the AppendEntries message.
602      */
603     @Test
604     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
605         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
606
607         MockRaftActorContext context = createActorContext();
608
609         // First set the receivers term to lower number
610         context.getTermInformation().update(1, "test");
611
612         // Prepare the receivers log
613         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
614         log.append(newReplicatedLogEntry(1, 0, "zero"));
615         log.append(newReplicatedLogEntry(1, 1, "one"));
616         log.append(newReplicatedLogEntry(1, 2, "two"));
617
618         context.setReplicatedLog(log);
619
620         // Prepare the entries to be sent with AppendEntries
621         List<ReplicatedLogEntry> entries = new ArrayList<>();
622         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
623         entries.add(newReplicatedLogEntry(2, 3, "three"));
624
625         // Send appendEntries with the same term as was set on the receiver
626         // before the new behavior was created (1 in this case)
627         // This will not work for a Candidate because as soon as a Candidate
628         // is created it increments the term
629         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
630
631         follower = createBehavior(context);
632
633         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
634
635         Assert.assertSame(follower, newBehavior);
636
637         // The entry at index 2 will be found out-of-sync with the leader
638         // and will be removed
639         // Then the two new entries will be added to the log
640         // Thus making the log to have 4 entries
641         assertEquals("Next index", 4, log.last().getIndex() + 1);
642         //assertEquals("Entry 2", entries.get(0), log.get(2));
643
644         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
645
646         // Check that the entry at index 2 has the new data
647         assertEquals("Entry 2", entries.get(0), log.get(2));
648
649         assertEquals("Entry 3", entries.get(1), log.get(3));
650
651         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
652     }
653
654     @Test
655     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
656         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
657
658         MockRaftActorContext context = createActorContext();
659
660         // First set the receivers term to lower number
661         context.getTermInformation().update(1, "test");
662
663         // Prepare the receivers log
664         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
665         log.append(newReplicatedLogEntry(1, 0, "zero"));
666         log.append(newReplicatedLogEntry(1, 1, "one"));
667         log.append(newReplicatedLogEntry(1, 2, "two"));
668
669         context.setReplicatedLog(log);
670
671         // Prepare the entries to be sent with AppendEntries
672         List<ReplicatedLogEntry> entries = new ArrayList<>();
673         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
674         entries.add(newReplicatedLogEntry(2, 3, "three"));
675
676         // Send appendEntries with the same term as was set on the receiver
677         // before the new behavior was created (1 in this case)
678         // This will not work for a Candidate because as soon as a Candidate
679         // is created it increments the term
680         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
681
682         context.setRaftPolicy(createRaftPolicy(false, true));
683         follower = createBehavior(context);
684
685         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
686
687         Assert.assertSame(follower, newBehavior);
688
689         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
690     }
691
692     @Test
693     public void testHandleAppendEntriesPreviousLogEntryMissing() {
694         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
695
696         final MockRaftActorContext context = createActorContext();
697
698         // Prepare the receivers log
699         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
700         log.append(newReplicatedLogEntry(1, 0, "zero"));
701         log.append(newReplicatedLogEntry(1, 1, "one"));
702         log.append(newReplicatedLogEntry(1, 2, "two"));
703
704         context.setReplicatedLog(log);
705
706         // Prepare the entries to be sent with AppendEntries
707         List<ReplicatedLogEntry> entries = new ArrayList<>();
708         entries.add(newReplicatedLogEntry(1, 4, "four"));
709
710         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
711
712         follower = createBehavior(context);
713
714         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
715
716         Assert.assertSame(follower, newBehavior);
717
718         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
719     }
720
721     @Test
722     public void testHandleAppendEntriesWithExistingLogEntry() {
723         logStart("testHandleAppendEntriesWithExistingLogEntry");
724
725         MockRaftActorContext context = createActorContext();
726
727         context.getTermInformation().update(1, "test");
728
729         // Prepare the receivers log
730         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
731         log.append(newReplicatedLogEntry(1, 0, "zero"));
732         log.append(newReplicatedLogEntry(1, 1, "one"));
733
734         context.setReplicatedLog(log);
735
736         // Send the last entry again.
737         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
738
739         follower = createBehavior(context);
740
741         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
742
743         assertEquals("Next index", 2, log.last().getIndex() + 1);
744         assertEquals("Entry 1", entries.get(0), log.get(1));
745
746         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
747
748         // Send the last entry again and also a new one.
749
750         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
751
752         leaderActor.underlyingActor().clear();
753         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
754
755         assertEquals("Next index", 3, log.last().getIndex() + 1);
756         assertEquals("Entry 1", entries.get(0), log.get(1));
757         assertEquals("Entry 2", entries.get(1), log.get(2));
758
759         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
760     }
761
762     @Test
763     public void testHandleAppendEntriesAfterInstallingSnapshot() {
764         logStart("testHandleAppendAfterInstallingSnapshot");
765
766         MockRaftActorContext context = createActorContext();
767
768         // Prepare the receivers log
769         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
770
771         // Set up a log as if it has been snapshotted
772         log.setSnapshotIndex(3);
773         log.setSnapshotTerm(1);
774
775         context.setReplicatedLog(log);
776
777         // Prepare the entries to be sent with AppendEntries
778         List<ReplicatedLogEntry> entries = new ArrayList<>();
779         entries.add(newReplicatedLogEntry(1, 4, "four"));
780
781         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
782
783         follower = createBehavior(context);
784
785         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
786
787         Assert.assertSame(follower, newBehavior);
788
789         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
790     }
791
792
793     /**
794      * This test verifies that when InstallSnapshot is received by
795      * the follower its applied correctly.
796      */
797     @Test
798     public void testHandleInstallSnapshot() {
799         logStart("testHandleInstallSnapshot");
800
801         MockRaftActorContext context = createActorContext();
802         context.getTermInformation().update(1, "leader");
803
804         follower = createBehavior(context);
805
806         ByteString bsSnapshot = createSnapshot();
807         int offset = 0;
808         int snapshotLength = bsSnapshot.size();
809         int chunkSize = 50;
810         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
811         int lastIncludedIndex = 1;
812         int chunkIndex = 1;
813         InstallSnapshot lastInstallSnapshot = null;
814
815         for (int i = 0; i < totalChunks; i++) {
816             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
817             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
818                     chunkData, chunkIndex, totalChunks);
819             follower.handleMessage(leaderActor, lastInstallSnapshot);
820             offset = offset + 50;
821             lastIncludedIndex++;
822             chunkIndex++;
823         }
824
825         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
826                 ApplySnapshot.class);
827         Snapshot snapshot = applySnapshot.getSnapshot();
828         assertNotNull(lastInstallSnapshot);
829         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
830         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
831                 snapshot.getLastAppliedTerm());
832         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
833                 snapshot.getLastAppliedIndex());
834         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
835         assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
836         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
837         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
838         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
839         applySnapshot.getCallback().onSuccess();
840
841         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
842                 leaderActor, InstallSnapshotReply.class);
843         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
844
845         chunkIndex = 1;
846         for (InstallSnapshotReply reply: replies) {
847             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
848             assertEquals("getTerm", 1, reply.getTerm());
849             assertEquals("isSuccess", true, reply.isSuccess());
850             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
851         }
852
853         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
854     }
855
856     /**
857      * Verify that when an AppendEntries is sent to a follower during a snapshot install
858      * the Follower short-circuits the processing of the AppendEntries message.
859      */
860     @Test
861     public void testReceivingAppendEntriesDuringInstallSnapshot() {
862         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
863
864         MockRaftActorContext context = createActorContext();
865
866         follower = createBehavior(context);
867
868         ByteString bsSnapshot  = createSnapshot();
869         int snapshotLength = bsSnapshot.size();
870         int chunkSize = 50;
871         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
872         int lastIncludedIndex = 1;
873
874         // Check that snapshot installation is not in progress
875         assertNull(follower.getSnapshotTracker());
876
877         // Make sure that we have more than 1 chunk to send
878         assertTrue(totalChunks > 1);
879
880         // Send an install snapshot with the first chunk to start the process of installing a snapshot
881         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
882         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
883                 chunkData, 1, totalChunks));
884
885         // Check if snapshot installation is in progress now
886         assertNotNull(follower.getSnapshotTracker());
887
888         // Send an append entry
889         AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
890                 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
891
892         follower.handleMessage(leaderActor, appendEntries);
893
894         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
895         assertEquals("isSuccess", true, reply.isSuccess());
896         assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
897         assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
898         assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
899
900         assertNotNull(follower.getSnapshotTracker());
901     }
902
903     @Test
904     public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
905         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
906
907         MockRaftActorContext context = createActorContext();
908
909         follower = createBehavior(context);
910
911         ByteString bsSnapshot  = createSnapshot();
912         int snapshotLength = bsSnapshot.size();
913         int chunkSize = 50;
914         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
915         int lastIncludedIndex = 1;
916
917         // Check that snapshot installation is not in progress
918         assertNull(follower.getSnapshotTracker());
919
920         // Make sure that we have more than 1 chunk to send
921         assertTrue(totalChunks > 1);
922
923         // Send an install snapshot with the first chunk to start the process of installing a snapshot
924         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
925         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
926                 chunkData, 1, totalChunks));
927
928         // Check if snapshot installation is in progress now
929         assertNotNull(follower.getSnapshotTracker());
930
931         // Send appendEntries with a new term and leader.
932         AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
933                 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
934
935         follower.handleMessage(leaderActor, appendEntries);
936
937         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
938         assertEquals("isSuccess", true, reply.isSuccess());
939         assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
940         assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
941         assertEquals("getTerm", 2, reply.getTerm());
942
943         assertNull(follower.getSnapshotTracker());
944     }
945
946     @Test
947     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
948         logStart("testInitialSyncUpWithHandleInstallSnapshot");
949
950         MockRaftActorContext context = createActorContext();
951         context.setCommitIndex(-1);
952
953         follower = createBehavior(context);
954
955         ByteString bsSnapshot  = createSnapshot();
956         int offset = 0;
957         int snapshotLength = bsSnapshot.size();
958         int chunkSize = 50;
959         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
960         int lastIncludedIndex = 1;
961         int chunkIndex = 1;
962         InstallSnapshot lastInstallSnapshot = null;
963
964         for (int i = 0; i < totalChunks; i++) {
965             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
966             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
967                     chunkData, chunkIndex, totalChunks);
968             follower.handleMessage(leaderActor, lastInstallSnapshot);
969             offset = offset + 50;
970             lastIncludedIndex++;
971             chunkIndex++;
972         }
973
974         FollowerInitialSyncUpStatus syncStatus =
975                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
976
977         assertFalse(syncStatus.isInitialSyncDone());
978
979         // Clear all the messages
980         followerActor.underlyingActor().clear();
981
982         context.setLastApplied(101);
983         context.setCommitIndex(101);
984         setLastLogEntry(context, 1, 101,
985                 new MockRaftActorContext.MockPayload(""));
986
987         List<ReplicatedLogEntry> entries = Arrays.asList(
988                 newReplicatedLogEntry(2, 101, "foo"));
989
990         // The new commitIndex is 101
991         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
992         follower.handleMessage(leaderActor, appendEntries);
993
994         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
995
996         assertTrue(syncStatus.isInitialSyncDone());
997     }
998
999     @Test
1000     public void testHandleOutOfSequenceInstallSnapshot() {
1001         logStart("testHandleOutOfSequenceInstallSnapshot");
1002
1003         MockRaftActorContext context = createActorContext();
1004
1005         follower = createBehavior(context);
1006
1007         ByteString bsSnapshot = createSnapshot();
1008
1009         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1010                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1011         follower.handleMessage(leaderActor, installSnapshot);
1012
1013         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1014                 InstallSnapshotReply.class);
1015
1016         assertEquals("isSuccess", false, reply.isSuccess());
1017         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1018         assertEquals("getTerm", 1, reply.getTerm());
1019         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1020
1021         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1022     }
1023
1024     @Test
1025     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1026         MockRaftActorContext context = createActorContext();
1027
1028         Stopwatch stopwatch = Stopwatch.createStarted();
1029
1030         follower = createBehavior(context);
1031
1032         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1033
1034         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1035
1036         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1037
1038         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1039         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1040     }
1041
1042     @Test
1043     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1044         MockRaftActorContext context = createActorContext();
1045         context.setConfigParams(new DefaultConfigParamsImpl() {
1046             @Override
1047             public FiniteDuration getElectionTimeOutInterval() {
1048                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1049             }
1050         });
1051
1052         context.setRaftPolicy(createRaftPolicy(false, false));
1053
1054         follower = createBehavior(context);
1055
1056         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1057         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1058         assertSame("handleMessage result", follower, newBehavior);
1059     }
1060
1061     @Test
1062     public void testFollowerSchedulesElectionIfNonVoting() {
1063         MockRaftActorContext context = createActorContext();
1064         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1065         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1066                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1067         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1068
1069         follower = new Follower(context, "leader", (short)1);
1070
1071         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1072                 ElectionTimeout.class);
1073         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1074         assertSame("handleMessage result", follower, newBehavior);
1075         assertNull("Expected null leaderId", follower.getLeaderId());
1076     }
1077
1078     @Test
1079     public void testElectionScheduledWhenAnyRaftRPCReceived() {
1080         MockRaftActorContext context = createActorContext();
1081         follower = createBehavior(context);
1082         follower.handleMessage(leaderActor, new RaftRPC() {
1083             private static final long serialVersionUID = 1L;
1084
1085             @Override
1086             public long getTerm() {
1087                 return 100;
1088             }
1089         });
1090         verify(follower).scheduleElection(any(FiniteDuration.class));
1091     }
1092
1093     @Test
1094     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1095         MockRaftActorContext context = createActorContext();
1096         follower = createBehavior(context);
1097         follower.handleMessage(leaderActor, "non-raft-rpc");
1098         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1099     }
1100
1101     @Test
1102     public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1103         String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1104         logStart(id);
1105
1106         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1107
1108         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1109         config.setSnapshotBatchCount(2);
1110         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1111
1112         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1113         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1114         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1115                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1116         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1117                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1118         followerRaftActor.set(followerActorRef.underlyingActor());
1119         followerRaftActor.get().waitForInitializeBehaviorComplete();
1120
1121         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1122         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1123         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1124
1125         List<ReplicatedLogEntry> entries = Arrays.asList(
1126                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1127
1128         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1129
1130         followerActorRef.tell(appendEntries, leaderActor);
1131
1132         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1133         assertEquals("isSuccess", true, reply.isSuccess());
1134
1135         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1136
1137         InMemoryJournal.waitForDeleteMessagesComplete(id);
1138         InMemoryJournal.waitForWriteMessagesComplete(id);
1139         // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1140         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1141         // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1142         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1143         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1144         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1145         assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1146
1147         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1148         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1149         assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1150         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1151         assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1152         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
1153                 MockRaftActor.fromState(snapshot.getState()));
1154     }
1155
1156     @Test
1157     public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1158         String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1159         logStart(id);
1160
1161         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1162
1163         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1164         config.setSnapshotBatchCount(2);
1165         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1166
1167         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1168         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1169         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1170                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1171         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1172                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1173         followerRaftActor.set(followerActorRef.underlyingActor());
1174         followerRaftActor.get().waitForInitializeBehaviorComplete();
1175
1176         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1177         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1178         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1179
1180         List<ReplicatedLogEntry> entries = Arrays.asList(
1181                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1182                 newReplicatedLogEntry(1, 2, "three"));
1183
1184         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1185
1186         followerActorRef.tell(appendEntries, leaderActor);
1187
1188         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1189         assertEquals("isSuccess", true, reply.isSuccess());
1190
1191         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1192
1193         InMemoryJournal.waitForDeleteMessagesComplete(id);
1194         InMemoryJournal.waitForWriteMessagesComplete(id);
1195         // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1196         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1197         // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1198         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1199         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1200         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1201         assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1202
1203         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1204         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1205         assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1206         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1207         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1208         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1209                 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1210
1211         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1212         assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1213
1214         // Reinstate the actor from persistence
1215
1216         actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem()));
1217
1218         followerActorRef = actorFactory.createTestActor(builder.props()
1219                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1220         followerRaftActor.set(followerActorRef.underlyingActor());
1221         followerRaftActor.get().waitForInitializeBehaviorComplete();
1222
1223         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1224         assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1225         assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1226         assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1227         assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1228                 entries.get(2).getData()), followerRaftActor.get().getState());
1229     }
1230
1231     @Test
1232     public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1233         String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1234         logStart(id);
1235
1236         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1237
1238         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1239         config.setSnapshotBatchCount(1);
1240         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1241
1242         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1243         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1244         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1245                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1246         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1247                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1248         followerRaftActor.set(followerActorRef.underlyingActor());
1249         followerRaftActor.get().waitForInitializeBehaviorComplete();
1250
1251         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1252         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1253         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1254
1255         List<ReplicatedLogEntry> entries = Arrays.asList(
1256                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1257                 newReplicatedLogEntry(1, 2, "three"));
1258
1259         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1260
1261         followerActorRef.tell(appendEntries, leaderActor);
1262
1263         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1264         assertEquals("isSuccess", true, reply.isSuccess());
1265
1266         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1267
1268         InMemoryJournal.waitForDeleteMessagesComplete(id);
1269         InMemoryJournal.waitForWriteMessagesComplete(id);
1270         // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1271         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1272         // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1273         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1274         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1275         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1276         assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1277
1278         assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1279         assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1280         assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1281         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1282         assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1283         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1284         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1285         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
1286                 MockRaftActor.fromState(snapshot.getState()));
1287     }
1288
1289     @SuppressWarnings("checkstyle:IllegalCatch")
1290     private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1291             final AtomicReference<MockRaftActor> followerRaftActor) {
1292         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1293             @Override
1294             public void createSnapshot(final ActorRef actorRef,
1295                     final java.util.Optional<OutputStream> installSnapshotStream) {
1296                 try {
1297                     actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1298                             installSnapshotStream), actorRef);
1299                 } catch (RuntimeException e) {
1300                     throw e;
1301                 } catch (Exception e) {
1302                     throw new RuntimeException(e);
1303                 }
1304             }
1305
1306             @Override
1307             public void applySnapshot(final State snapshotState) {
1308             }
1309
1310             @Override
1311             public State deserializeSnapshot(final ByteSource snapshotBytes) {
1312                 throw new UnsupportedOperationException();
1313             }
1314         };
1315         return snapshotCohort;
1316     }
1317
1318     public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1319         int snapshotLength = bs.size();
1320         int start = offset;
1321         int size = chunkSize;
1322         if (chunkSize > snapshotLength) {
1323             size = snapshotLength;
1324         } else {
1325             if (start + chunkSize > snapshotLength) {
1326                 size = snapshotLength - start;
1327             }
1328         }
1329
1330         byte[] nextChunk = new byte[size];
1331         bs.copyTo(nextChunk, start, 0, size);
1332         return nextChunk;
1333     }
1334
1335     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1336             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1337         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1338     }
1339
1340     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1341             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1342             final boolean expForceInstallSnapshot) {
1343
1344         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1345                 AppendEntriesReply.class);
1346
1347         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1348         assertEquals("getTerm", expTerm, reply.getTerm());
1349         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1350         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1351         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1352         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1353         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1354     }
1355
1356
1357     private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1358         return new SimpleReplicatedLogEntry(index, term,
1359                 new MockRaftActorContext.MockPayload(data));
1360     }
1361
1362     private ByteString createSnapshot() {
1363         HashMap<String, String> followerSnapshot = new HashMap<>();
1364         followerSnapshot.put("1", "A");
1365         followerSnapshot.put("2", "B");
1366         followerSnapshot.put("3", "C");
1367
1368         return toByteString(followerSnapshot);
1369     }
1370
1371     @Override
1372     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1373             final ActorRef actorRef, final RaftRPC rpc) {
1374         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1375
1376         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1377         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1378     }
1379
1380     @Override
1381     protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor) {
1382         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1383         assertEquals("isSuccess", true, reply.isSuccess());
1384     }
1385 }