Make private methods static
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.Props;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Stopwatch;
24 import com.google.protobuf.ByteString;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.After;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
34 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftActorContext;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.Snapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
40 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
45 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
46 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
47 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
48 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
49 import scala.concurrent.duration.FiniteDuration;
50
51 public class FollowerTest extends AbstractRaftActorBehaviorTest {
52
53     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
54             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
55
56     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
57             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
58
59     private RaftActorBehavior follower;
60
61     private final short payloadVersion = 5;
62
63     @Override
64     @After
65     public void tearDown() throws Exception {
66         if(follower != null) {
67             follower.close();
68         }
69
70         super.tearDown();
71     }
72
73     @Override
74     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
75         return new TestFollower(actorContext);
76     }
77
78     @Override
79     protected  MockRaftActorContext createActorContext() {
80         return createActorContext(followerActor);
81     }
82
83     @Override
84     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
85         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
86         context.setPayloadVersion(payloadVersion );
87         return context;
88     }
89
90     private static int getElectionTimeoutCount(RaftActorBehavior follower){
91         if(follower instanceof TestFollower){
92             return ((TestFollower) follower).getElectionTimeoutCount();
93         }
94         return -1;
95     }
96
97     @Test
98     public void testThatAnElectionTimeoutIsTriggered(){
99         MockRaftActorContext actorContext = createActorContext();
100         follower = new Follower(actorContext);
101
102         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
103                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
104     }
105
106     @Test
107     public void testHandleElectionTimeout(){
108         logStart("testHandleElectionTimeout");
109
110         follower = new Follower(createActorContext());
111
112         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
113
114         assertTrue(raftBehavior instanceof Candidate);
115     }
116
117     @Test
118     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
119         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
120
121         RaftActorContext context = createActorContext();
122         long term = 1000;
123         context.getTermInformation().update(term, null);
124
125         follower = createBehavior(context);
126
127         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
128
129         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
130
131         assertEquals("isVoteGranted", true, reply.isVoteGranted());
132         assertEquals("getTerm", term, reply.getTerm());
133         assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
134     }
135
136     @Test
137     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
138         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
139
140         RaftActorContext context = createActorContext();
141         long term = 1000;
142         context.getTermInformation().update(term, "test");
143
144         follower = createBehavior(context);
145
146         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
147
148         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
149
150         assertEquals("isVoteGranted", false, reply.isVoteGranted());
151         assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
152     }
153
154
155     @Test
156     public void testHandleFirstAppendEntries() throws Exception {
157         logStart("testHandleFirstAppendEntries");
158
159         MockRaftActorContext context = createActorContext();
160         context.getReplicatedLog().clear(0,2);
161         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
162         context.getReplicatedLog().setSnapshotIndex(99);
163
164         List<ReplicatedLogEntry> entries = Arrays.asList(
165                 newReplicatedLogEntry(2, 101, "foo"));
166
167         Assert.assertEquals(1, context.getReplicatedLog().size());
168
169         // The new commitIndex is 101
170         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
171
172         follower = createBehavior(context);
173         follower.handleMessage(leaderActor, appendEntries);
174
175         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
176         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
177
178         assertFalse(syncStatus.isInitialSyncDone());
179         assertTrue("append entries reply should be true", reply.isSuccess());
180     }
181
182     @Test
183     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
184         logStart("testHandleFirstAppendEntries");
185
186         MockRaftActorContext context = createActorContext();
187
188         List<ReplicatedLogEntry> entries = Arrays.asList(
189                 newReplicatedLogEntry(2, 101, "foo"));
190
191         // The new commitIndex is 101
192         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
193
194         follower = createBehavior(context);
195         follower.handleMessage(leaderActor, appendEntries);
196
197         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
198         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
199
200         assertFalse(syncStatus.isInitialSyncDone());
201         assertFalse("append entries reply should be false", reply.isSuccess());
202     }
203
204     @Test
205     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
206         logStart("testHandleFirstAppendEntries");
207
208         MockRaftActorContext context = createActorContext();
209         context.getReplicatedLog().clear(0,2);
210         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
211         context.getReplicatedLog().setSnapshotIndex(99);
212
213         List<ReplicatedLogEntry> entries = Arrays.asList(
214                 newReplicatedLogEntry(2, 101, "foo"));
215
216         // The new commitIndex is 101
217         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
218
219         follower = createBehavior(context);
220         follower.handleMessage(leaderActor, appendEntries);
221
222         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
223         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
224
225         assertFalse(syncStatus.isInitialSyncDone());
226         assertTrue("append entries reply should be true", reply.isSuccess());
227     }
228
229     @Test
230     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
231         logStart("testHandleFirstAppendEntries");
232
233         MockRaftActorContext context = createActorContext();
234         context.getReplicatedLog().clear(0,2);
235         context.getReplicatedLog().setSnapshotIndex(100);
236
237         List<ReplicatedLogEntry> entries = Arrays.asList(
238                 newReplicatedLogEntry(2, 101, "foo"));
239
240         // The new commitIndex is 101
241         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
242
243         follower = createBehavior(context);
244         follower.handleMessage(leaderActor, appendEntries);
245
246         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
247         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
248
249         assertFalse(syncStatus.isInitialSyncDone());
250         assertTrue("append entries reply should be true", reply.isSuccess());
251     }
252
253     @Test
254     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
255         logStart("testHandleFirstAppendEntries");
256
257         MockRaftActorContext context = createActorContext();
258         context.getReplicatedLog().clear(0,2);
259         context.getReplicatedLog().setSnapshotIndex(100);
260
261         List<ReplicatedLogEntry> entries = Arrays.asList(
262                 newReplicatedLogEntry(2, 105, "foo"));
263
264         // The new commitIndex is 101
265         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
266
267         follower = createBehavior(context);
268         follower.handleMessage(leaderActor, appendEntries);
269
270         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
271         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
272
273         assertFalse(syncStatus.isInitialSyncDone());
274         assertFalse("append entries reply should be false", reply.isSuccess());
275     }
276
277     @Test
278     public void testHandleSyncUpAppendEntries() throws Exception {
279         logStart("testHandleSyncUpAppendEntries");
280
281         MockRaftActorContext context = createActorContext();
282
283         List<ReplicatedLogEntry> entries = Arrays.asList(
284                 newReplicatedLogEntry(2, 101, "foo"));
285
286         // The new commitIndex is 101
287         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
288
289         follower = createBehavior(context);
290         follower.handleMessage(leaderActor, appendEntries);
291
292         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
293
294         assertFalse(syncStatus.isInitialSyncDone());
295
296         // Clear all the messages
297         followerActor.underlyingActor().clear();
298
299         context.setLastApplied(101);
300         context.setCommitIndex(101);
301         setLastLogEntry(context, 1, 101,
302                 new MockRaftActorContext.MockPayload(""));
303
304         entries = Arrays.asList(
305                 newReplicatedLogEntry(2, 101, "foo"));
306
307         // The new commitIndex is 101
308         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
309         follower.handleMessage(leaderActor, appendEntries);
310
311         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
312
313         assertTrue(syncStatus.isInitialSyncDone());
314
315         followerActor.underlyingActor().clear();
316
317         // Sending the same message again should not generate another message
318
319         follower.handleMessage(leaderActor, appendEntries);
320
321         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
322
323         assertNull(syncStatus);
324
325     }
326
327     @Test
328     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
329         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
330
331         MockRaftActorContext context = createActorContext();
332
333         List<ReplicatedLogEntry> entries = Arrays.asList(
334                 newReplicatedLogEntry(2, 101, "foo"));
335
336         // The new commitIndex is 101
337         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
338
339         follower = createBehavior(context);
340         follower.handleMessage(leaderActor, appendEntries);
341
342         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
343
344         assertFalse(syncStatus.isInitialSyncDone());
345
346         // Clear all the messages
347         followerActor.underlyingActor().clear();
348
349         context.setLastApplied(100);
350         setLastLogEntry(context, 1, 100,
351                 new MockRaftActorContext.MockPayload(""));
352
353         entries = Arrays.asList(
354                 newReplicatedLogEntry(2, 101, "foo"));
355
356         // leader-2 is becoming the leader now and it says the commitIndex is 45
357         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
358         follower.handleMessage(leaderActor, appendEntries);
359
360         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
361
362         // We get a new message saying initial status is not done
363         assertFalse(syncStatus.isInitialSyncDone());
364
365     }
366
367
368     @Test
369     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
370         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
371
372         MockRaftActorContext context = createActorContext();
373
374         List<ReplicatedLogEntry> entries = Arrays.asList(
375                 newReplicatedLogEntry(2, 101, "foo"));
376
377         // The new commitIndex is 101
378         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
379
380         follower = createBehavior(context);
381         follower.handleMessage(leaderActor, appendEntries);
382
383         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
384
385         assertFalse(syncStatus.isInitialSyncDone());
386
387         // Clear all the messages
388         followerActor.underlyingActor().clear();
389
390         context.setLastApplied(101);
391         context.setCommitIndex(101);
392         setLastLogEntry(context, 1, 101,
393                 new MockRaftActorContext.MockPayload(""));
394
395         entries = Arrays.asList(
396                 newReplicatedLogEntry(2, 101, "foo"));
397
398         // The new commitIndex is 101
399         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
400         follower.handleMessage(leaderActor, appendEntries);
401
402         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
403
404         assertTrue(syncStatus.isInitialSyncDone());
405
406         // Clear all the messages
407         followerActor.underlyingActor().clear();
408
409         context.setLastApplied(100);
410         setLastLogEntry(context, 1, 100,
411                 new MockRaftActorContext.MockPayload(""));
412
413         entries = Arrays.asList(
414                 newReplicatedLogEntry(2, 101, "foo"));
415
416         // leader-2 is becoming the leader now and it says the commitIndex is 45
417         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
418         follower.handleMessage(leaderActor, appendEntries);
419
420         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
421
422         // We get a new message saying initial status is not done
423         assertFalse(syncStatus.isInitialSyncDone());
424
425     }
426
427
428     /**
429      * This test verifies that when an AppendEntries RPC is received by a RaftActor
430      * with a commitIndex that is greater than what has been applied to the
431      * state machine of the RaftActor, the RaftActor applies the state and
432      * sets it current applied state to the commitIndex of the sender.
433      *
434      * @throws Exception
435      */
436     @Test
437     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
438         logStart("testHandleAppendEntriesWithNewerCommitIndex");
439
440         MockRaftActorContext context = createActorContext();
441
442         context.setLastApplied(100);
443         setLastLogEntry(context, 1, 100,
444                 new MockRaftActorContext.MockPayload(""));
445         context.getReplicatedLog().setSnapshotIndex(99);
446
447         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
448                 newReplicatedLogEntry(2, 101, "foo"));
449
450         // The new commitIndex is 101
451         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
452
453         follower = createBehavior(context);
454         follower.handleMessage(leaderActor, appendEntries);
455
456         assertEquals("getLastApplied", 101L, context.getLastApplied());
457     }
458
459     /**
460      * This test verifies that when an AppendEntries is received a specific prevLogTerm
461      * which does not match the term that is in RaftActors log entry at prevLogIndex
462      * then the RaftActor does not change it's state and it returns a failure.
463      *
464      * @throws Exception
465      */
466     @Test
467     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
468         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
469
470         MockRaftActorContext context = createActorContext();
471
472         // First set the receivers term to lower number
473         context.getTermInformation().update(95, "test");
474
475         // AppendEntries is now sent with a bigger term
476         // this will set the receivers term to be the same as the sender's term
477         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
478
479         follower = createBehavior(context);
480
481         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
482
483         Assert.assertSame(follower, newBehavior);
484
485         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
486                 AppendEntriesReply.class);
487
488         assertEquals("isSuccess", false, reply.isSuccess());
489     }
490
491     /**
492      * This test verifies that when a new AppendEntries message is received with
493      * new entries and the logs of the sender and receiver match that the new
494      * entries get added to the log and the log is incremented by the number of
495      * entries received in appendEntries
496      *
497      * @throws Exception
498      */
499     @Test
500     public void testHandleAppendEntriesAddNewEntries() {
501         logStart("testHandleAppendEntriesAddNewEntries");
502
503         MockRaftActorContext context = createActorContext();
504
505         // First set the receivers term to lower number
506         context.getTermInformation().update(1, "test");
507
508         // Prepare the receivers log
509         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
510         log.append(newReplicatedLogEntry(1, 0, "zero"));
511         log.append(newReplicatedLogEntry(1, 1, "one"));
512         log.append(newReplicatedLogEntry(1, 2, "two"));
513
514         context.setReplicatedLog(log);
515
516         // Prepare the entries to be sent with AppendEntries
517         List<ReplicatedLogEntry> entries = new ArrayList<>();
518         entries.add(newReplicatedLogEntry(1, 3, "three"));
519         entries.add(newReplicatedLogEntry(1, 4, "four"));
520
521         // Send appendEntries with the same term as was set on the receiver
522         // before the new behavior was created (1 in this case)
523         // This will not work for a Candidate because as soon as a Candidate
524         // is created it increments the term
525         short leaderPayloadVersion = 10;
526         String leaderId = "leader-1";
527         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
528
529         follower = createBehavior(context);
530
531         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
532
533         Assert.assertSame(follower, newBehavior);
534
535         assertEquals("Next index", 5, log.last().getIndex() + 1);
536         assertEquals("Entry 3", entries.get(0), log.get(3));
537         assertEquals("Entry 4", entries.get(1), log.get(4));
538
539         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
540         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
541
542         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
543     }
544
545     /**
546      * This test verifies that when a new AppendEntries message is received with
547      * new entries and the logs of the sender and receiver are out-of-sync that
548      * the log is first corrected by removing the out of sync entries from the
549      * log and then adding in the new entries sent with the AppendEntries message
550      */
551     @Test
552     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
553         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
554
555         MockRaftActorContext context = createActorContext();
556
557         // First set the receivers term to lower number
558         context.getTermInformation().update(1, "test");
559
560         // Prepare the receivers log
561         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
562         log.append(newReplicatedLogEntry(1, 0, "zero"));
563         log.append(newReplicatedLogEntry(1, 1, "one"));
564         log.append(newReplicatedLogEntry(1, 2, "two"));
565
566         context.setReplicatedLog(log);
567
568         // Prepare the entries to be sent with AppendEntries
569         List<ReplicatedLogEntry> entries = new ArrayList<>();
570         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
571         entries.add(newReplicatedLogEntry(2, 3, "three"));
572
573         // Send appendEntries with the same term as was set on the receiver
574         // before the new behavior was created (1 in this case)
575         // This will not work for a Candidate because as soon as a Candidate
576         // is created it increments the term
577         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
578
579         follower = createBehavior(context);
580
581         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
582
583         Assert.assertSame(follower, newBehavior);
584
585         // The entry at index 2 will be found out-of-sync with the leader
586         // and will be removed
587         // Then the two new entries will be added to the log
588         // Thus making the log to have 4 entries
589         assertEquals("Next index", 4, log.last().getIndex() + 1);
590         //assertEquals("Entry 2", entries.get(0), log.get(2));
591
592         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
593
594         // Check that the entry at index 2 has the new data
595         assertEquals("Entry 2", entries.get(0), log.get(2));
596
597         assertEquals("Entry 3", entries.get(1), log.get(3));
598
599         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
600     }
601
602     @Test
603     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
604         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
605
606         MockRaftActorContext context = createActorContext();
607
608         // First set the receivers term to lower number
609         context.getTermInformation().update(1, "test");
610
611         // Prepare the receivers log
612         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
613         log.append(newReplicatedLogEntry(1, 0, "zero"));
614         log.append(newReplicatedLogEntry(1, 1, "one"));
615         log.append(newReplicatedLogEntry(1, 2, "two"));
616
617         context.setReplicatedLog(log);
618
619         // Prepare the entries to be sent with AppendEntries
620         List<ReplicatedLogEntry> entries = new ArrayList<>();
621         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
622         entries.add(newReplicatedLogEntry(2, 3, "three"));
623
624         // Send appendEntries with the same term as was set on the receiver
625         // before the new behavior was created (1 in this case)
626         // This will not work for a Candidate because as soon as a Candidate
627         // is created it increments the term
628         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
629
630         context.setRaftPolicy(createRaftPolicy(false, true));
631         follower = createBehavior(context);
632
633         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
634
635         Assert.assertSame(follower, newBehavior);
636
637         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
638     }
639
640     @Test
641     public void testHandleAppendEntriesPreviousLogEntryMissing(){
642         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
643
644         MockRaftActorContext context = createActorContext();
645
646         // Prepare the receivers log
647         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
648         log.append(newReplicatedLogEntry(1, 0, "zero"));
649         log.append(newReplicatedLogEntry(1, 1, "one"));
650         log.append(newReplicatedLogEntry(1, 2, "two"));
651
652         context.setReplicatedLog(log);
653
654         // Prepare the entries to be sent with AppendEntries
655         List<ReplicatedLogEntry> entries = new ArrayList<>();
656         entries.add(newReplicatedLogEntry(1, 4, "four"));
657
658         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
659
660         follower = createBehavior(context);
661
662         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
663
664         Assert.assertSame(follower, newBehavior);
665
666         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
667     }
668
669     @Test
670     public void testHandleAppendEntriesWithExistingLogEntry() {
671         logStart("testHandleAppendEntriesWithExistingLogEntry");
672
673         MockRaftActorContext context = createActorContext();
674
675         context.getTermInformation().update(1, "test");
676
677         // Prepare the receivers log
678         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
679         log.append(newReplicatedLogEntry(1, 0, "zero"));
680         log.append(newReplicatedLogEntry(1, 1, "one"));
681
682         context.setReplicatedLog(log);
683
684         // Send the last entry again.
685         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
686
687         follower = createBehavior(context);
688
689         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
690
691         assertEquals("Next index", 2, log.last().getIndex() + 1);
692         assertEquals("Entry 1", entries.get(0), log.get(1));
693
694         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
695
696         // Send the last entry again and also a new one.
697
698         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
699
700         leaderActor.underlyingActor().clear();
701         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
702
703         assertEquals("Next index", 3, log.last().getIndex() + 1);
704         assertEquals("Entry 1", entries.get(0), log.get(1));
705         assertEquals("Entry 2", entries.get(1), log.get(2));
706
707         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
708     }
709
710     @Test
711     public void testHandleAppendEntriesAfterInstallingSnapshot(){
712         logStart("testHandleAppendAfterInstallingSnapshot");
713
714         MockRaftActorContext context = createActorContext();
715
716         // Prepare the receivers log
717         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
718
719         // Set up a log as if it has been snapshotted
720         log.setSnapshotIndex(3);
721         log.setSnapshotTerm(1);
722
723         context.setReplicatedLog(log);
724
725         // Prepare the entries to be sent with AppendEntries
726         List<ReplicatedLogEntry> entries = new ArrayList<>();
727         entries.add(newReplicatedLogEntry(1, 4, "four"));
728
729         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
730
731         follower = createBehavior(context);
732
733         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
734
735         Assert.assertSame(follower, newBehavior);
736
737         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
738     }
739
740
741     /**
742      * This test verifies that when InstallSnapshot is received by
743      * the follower its applied correctly.
744      *
745      * @throws Exception
746      */
747     @Test
748     public void testHandleInstallSnapshot() throws Exception {
749         logStart("testHandleInstallSnapshot");
750
751         MockRaftActorContext context = createActorContext();
752         context.getTermInformation().update(1, "leader");
753
754         follower = createBehavior(context);
755
756         ByteString bsSnapshot  = createSnapshot();
757         int offset = 0;
758         int snapshotLength = bsSnapshot.size();
759         int chunkSize = 50;
760         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
761         int lastIncludedIndex = 1;
762         int chunkIndex = 1;
763         InstallSnapshot lastInstallSnapshot = null;
764
765         for(int i = 0; i < totalChunks; i++) {
766             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
767             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
768                     chunkData, chunkIndex, totalChunks);
769             follower.handleMessage(leaderActor, lastInstallSnapshot);
770             offset = offset + 50;
771             lastIncludedIndex++;
772             chunkIndex++;
773         }
774
775         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
776                 ApplySnapshot.class);
777         Snapshot snapshot = applySnapshot.getSnapshot();
778         assertNotNull(lastInstallSnapshot);
779         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
780         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
781                 snapshot.getLastAppliedTerm());
782         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
783                 snapshot.getLastAppliedIndex());
784         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
785         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
786         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
787         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
788         applySnapshot.getCallback().onSuccess();
789
790         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
791                 leaderActor, InstallSnapshotReply.class);
792         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
793
794         chunkIndex = 1;
795         for(InstallSnapshotReply reply: replies) {
796             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
797             assertEquals("getTerm", 1, reply.getTerm());
798             assertEquals("isSuccess", true, reply.isSuccess());
799             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
800         }
801
802         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
803     }
804
805
806     /**
807      * Verify that when an AppendEntries is sent to a follower during a snapshot install
808      * the Follower short-circuits the processing of the AppendEntries message.
809      *
810      * @throws Exception
811      */
812     @Test
813     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
814         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
815
816         MockRaftActorContext context = createActorContext();
817
818         follower = createBehavior(context);
819
820         ByteString bsSnapshot  = createSnapshot();
821         int snapshotLength = bsSnapshot.size();
822         int chunkSize = 50;
823         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
824         int lastIncludedIndex = 1;
825
826         // Check that snapshot installation is not in progress
827         assertNull(((Follower) follower).getSnapshotTracker());
828
829         // Make sure that we have more than 1 chunk to send
830         assertTrue(totalChunks > 1);
831
832         // Send an install snapshot with the first chunk to start the process of installing a snapshot
833         ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
834         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
835                 chunkData, 1, totalChunks));
836
837         // Check if snapshot installation is in progress now
838         assertNotNull(((Follower) follower).getSnapshotTracker());
839
840         // Send an append entry
841         AppendEntries appendEntries = mock(AppendEntries.class);
842         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
843
844         follower.handleMessage(leaderActor, appendEntries);
845
846         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
847         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
848         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
849         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
850
851         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
852         verify(appendEntries, never()).getPrevLogIndex();
853
854     }
855
856     @Test
857     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
858         logStart("testInitialSyncUpWithHandleInstallSnapshot");
859
860         MockRaftActorContext context = createActorContext();
861
862         follower = createBehavior(context);
863
864         ByteString bsSnapshot  = createSnapshot();
865         int offset = 0;
866         int snapshotLength = bsSnapshot.size();
867         int chunkSize = 50;
868         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
869         int lastIncludedIndex = 1;
870         int chunkIndex = 1;
871         InstallSnapshot lastInstallSnapshot = null;
872
873         for(int i = 0; i < totalChunks; i++) {
874             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
875             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
876                     chunkData, chunkIndex, totalChunks);
877             follower.handleMessage(leaderActor, lastInstallSnapshot);
878             offset = offset + 50;
879             lastIncludedIndex++;
880             chunkIndex++;
881         }
882
883         FollowerInitialSyncUpStatus syncStatus =
884                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
885
886         assertFalse(syncStatus.isInitialSyncDone());
887
888         // Clear all the messages
889         followerActor.underlyingActor().clear();
890
891         context.setLastApplied(101);
892         context.setCommitIndex(101);
893         setLastLogEntry(context, 1, 101,
894                 new MockRaftActorContext.MockPayload(""));
895
896         List<ReplicatedLogEntry> entries = Arrays.asList(
897                 newReplicatedLogEntry(2, 101, "foo"));
898
899         // The new commitIndex is 101
900         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
901         follower.handleMessage(leaderActor, appendEntries);
902
903         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
904
905         assertTrue(syncStatus.isInitialSyncDone());
906     }
907
908     @Test
909     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
910         logStart("testHandleOutOfSequenceInstallSnapshot");
911
912         MockRaftActorContext context = createActorContext();
913
914         follower = createBehavior(context);
915
916         ByteString bsSnapshot = createSnapshot();
917
918         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
919                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
920         follower.handleMessage(leaderActor, installSnapshot);
921
922         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
923                 InstallSnapshotReply.class);
924
925         assertEquals("isSuccess", false, reply.isSuccess());
926         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
927         assertEquals("getTerm", 1, reply.getTerm());
928         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
929
930         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
931     }
932
933     @Test
934     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
935         MockRaftActorContext context = createActorContext();
936
937         Stopwatch stopwatch = Stopwatch.createStarted();
938
939         follower = createBehavior(context);
940
941         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
942
943         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
944
945         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
946     }
947
948     @Test
949     public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
950         MockRaftActorContext context = createActorContext();
951         context.setConfigParams(new DefaultConfigParamsImpl(){
952             @Override
953             public FiniteDuration getElectionTimeOutInterval() {
954                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
955             }
956         });
957
958         context.setRaftPolicy(createRaftPolicy(false, false));
959
960         follower = createBehavior(context);
961
962         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
963     }
964
965     @Test
966     public void testElectionScheduledWhenAnyRaftRPCReceived(){
967         MockRaftActorContext context = createActorContext();
968         follower = createBehavior(context);
969         follower.handleMessage(leaderActor, new RaftRPC() {
970             @Override
971             public long getTerm() {
972                 return 100;
973             }
974         });
975         assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
976     }
977
978     @Test
979     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
980         MockRaftActorContext context = createActorContext();
981         follower = createBehavior(context);
982         follower.handleMessage(leaderActor, "non-raft-rpc");
983         assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
984     }
985
986     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
987         int snapshotLength = bs.size();
988         int start = offset;
989         int size = chunkSize;
990         if (chunkSize > snapshotLength) {
991             size = snapshotLength;
992         } else {
993             if ((start + chunkSize) > snapshotLength) {
994                 size = snapshotLength - start;
995             }
996         }
997         return bs.substring(start, start + size);
998     }
999
1000     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1001             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1002         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1003     }
1004
1005     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1006                                                    String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1007                                                    boolean expForceInstallSnapshot) {
1008
1009         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1010                 AppendEntriesReply.class);
1011
1012         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1013         assertEquals("getTerm", expTerm, reply.getTerm());
1014         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1015         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1016         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1017         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1018         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1019     }
1020
1021
1022     private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1023         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1024                 new MockRaftActorContext.MockPayload(data));
1025     }
1026
1027     private ByteString createSnapshot(){
1028         HashMap<String, String> followerSnapshot = new HashMap<>();
1029         followerSnapshot.put("1", "A");
1030         followerSnapshot.put("2", "B");
1031         followerSnapshot.put("3", "C");
1032
1033         return toByteString(followerSnapshot);
1034     }
1035
1036     @Override
1037     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1038             ActorRef actorRef, RaftRPC rpc) throws Exception {
1039         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1040
1041         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
1042         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1043     }
1044
1045     @Override
1046     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
1047             throws Exception {
1048         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1049         assertEquals("isSuccess", true, reply.isSuccess());
1050     }
1051
1052     private static class TestFollower extends Follower {
1053
1054         int electionTimeoutCount = 0;
1055
1056         public TestFollower(RaftActorContext context) {
1057             super(context);
1058         }
1059
1060         @Override
1061         protected void scheduleElection(FiniteDuration interval) {
1062             electionTimeoutCount++;
1063             super.scheduleElection(interval);
1064         }
1065
1066         public int getElectionTimeoutCount() {
1067             return electionTimeoutCount;
1068         }
1069     }
1070 }