BUG 2185 : Follower should request forceInstallSnapshot in out-of-sync scenario
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.Props;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Stopwatch;
24 import com.google.protobuf.ByteString;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.After;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
34 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftActorContext;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.Snapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
40 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
45 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
46 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
47 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
48 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
49 import scala.concurrent.duration.FiniteDuration;
50
51 public class FollowerTest extends AbstractRaftActorBehaviorTest {
52
53     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
54             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
55
56     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
57             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
58
59     private RaftActorBehavior follower;
60
61     private final short payloadVersion = 5;
62
63     @Override
64     @After
65     public void tearDown() throws Exception {
66         if(follower != null) {
67             follower.close();
68         }
69
70         super.tearDown();
71     }
72
73     @Override
74     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
75         return new TestFollower(actorContext);
76     }
77
78     @Override
79     protected  MockRaftActorContext createActorContext() {
80         return createActorContext(followerActor);
81     }
82
83     @Override
84     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
85         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
86         context.setPayloadVersion(payloadVersion );
87         return context;
88     }
89
90     private int getElectionTimeoutCount(RaftActorBehavior follower){
91         if(follower instanceof TestFollower){
92             return ((TestFollower) follower).getElectionTimeoutCount();
93         }
94         return -1;
95     }
96
97     @Test
98     public void testThatAnElectionTimeoutIsTriggered(){
99         MockRaftActorContext actorContext = createActorContext();
100         follower = new Follower(actorContext);
101
102         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
103                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
104     }
105
106     @Test
107     public void testHandleElectionTimeout(){
108         logStart("testHandleElectionTimeout");
109
110         follower = new Follower(createActorContext());
111
112         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
113
114         assertTrue(raftBehavior instanceof Candidate);
115     }
116
117     @Test
118     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
119         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
120
121         RaftActorContext context = createActorContext();
122         long term = 1000;
123         context.getTermInformation().update(term, null);
124
125         follower = createBehavior(context);
126
127         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
128
129         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
130
131         assertEquals("isVoteGranted", true, reply.isVoteGranted());
132         assertEquals("getTerm", term, reply.getTerm());
133         assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
134     }
135
136     @Test
137     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
138         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
139
140         RaftActorContext context = createActorContext();
141         long term = 1000;
142         context.getTermInformation().update(term, "test");
143
144         follower = createBehavior(context);
145
146         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
147
148         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
149
150         assertEquals("isVoteGranted", false, reply.isVoteGranted());
151         assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
152     }
153
154
155     @Test
156     public void testHandleFirstAppendEntries() throws Exception {
157         logStart("testHandleFirstAppendEntries");
158
159         MockRaftActorContext context = createActorContext();
160         context.getReplicatedLog().clear(0,2);
161         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
162         context.getReplicatedLog().setSnapshotIndex(99);
163
164         List<ReplicatedLogEntry> entries = Arrays.asList(
165                 newReplicatedLogEntry(2, 101, "foo"));
166
167         Assert.assertEquals(1, context.getReplicatedLog().size());
168
169         // The new commitIndex is 101
170         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
171
172         follower = createBehavior(context);
173         follower.handleMessage(leaderActor, appendEntries);
174
175         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
176         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
177
178         assertFalse(syncStatus.isInitialSyncDone());
179         assertTrue("append entries reply should be true", reply.isSuccess());
180     }
181
182     @Test
183     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
184         logStart("testHandleFirstAppendEntries");
185
186         MockRaftActorContext context = createActorContext();
187
188         List<ReplicatedLogEntry> entries = Arrays.asList(
189                 newReplicatedLogEntry(2, 101, "foo"));
190
191         // The new commitIndex is 101
192         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
193
194         follower = createBehavior(context);
195         follower.handleMessage(leaderActor, appendEntries);
196
197         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
198         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
199
200         assertFalse(syncStatus.isInitialSyncDone());
201         assertFalse("append entries reply should be false", reply.isSuccess());
202     }
203
204     @Test
205     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
206         logStart("testHandleFirstAppendEntries");
207
208         MockRaftActorContext context = createActorContext();
209         context.getReplicatedLog().clear(0,2);
210         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
211         context.getReplicatedLog().setSnapshotIndex(99);
212
213         List<ReplicatedLogEntry> entries = Arrays.asList(
214                 newReplicatedLogEntry(2, 101, "foo"));
215
216         // The new commitIndex is 101
217         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
218
219         follower = createBehavior(context);
220         follower.handleMessage(leaderActor, appendEntries);
221
222         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
223         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
224
225         assertFalse(syncStatus.isInitialSyncDone());
226         assertTrue("append entries reply should be true", reply.isSuccess());
227     }
228
229     @Test
230     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
231         logStart("testHandleFirstAppendEntries");
232
233         MockRaftActorContext context = createActorContext();
234         context.getReplicatedLog().clear(0,2);
235         context.getReplicatedLog().setSnapshotIndex(100);
236
237         List<ReplicatedLogEntry> entries = Arrays.asList(
238                 newReplicatedLogEntry(2, 101, "foo"));
239
240         // The new commitIndex is 101
241         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
242
243         follower = createBehavior(context);
244         follower.handleMessage(leaderActor, appendEntries);
245
246         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
247         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
248
249         assertFalse(syncStatus.isInitialSyncDone());
250         assertTrue("append entries reply should be true", reply.isSuccess());
251     }
252
253     @Test
254     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
255         logStart("testHandleFirstAppendEntries");
256
257         MockRaftActorContext context = createActorContext();
258         context.getReplicatedLog().clear(0,2);
259         context.getReplicatedLog().setSnapshotIndex(100);
260
261         List<ReplicatedLogEntry> entries = Arrays.asList(
262                 newReplicatedLogEntry(2, 105, "foo"));
263
264         // The new commitIndex is 101
265         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
266
267         follower = createBehavior(context);
268         follower.handleMessage(leaderActor, appendEntries);
269
270         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
271         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
272
273         assertFalse(syncStatus.isInitialSyncDone());
274         assertFalse("append entries reply should be false", reply.isSuccess());
275     }
276
277     @Test
278     public void testHandleSyncUpAppendEntries() throws Exception {
279         logStart("testHandleSyncUpAppendEntries");
280
281         MockRaftActorContext context = createActorContext();
282
283         List<ReplicatedLogEntry> entries = Arrays.asList(
284                 newReplicatedLogEntry(2, 101, "foo"));
285
286         // The new commitIndex is 101
287         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
288
289         follower = createBehavior(context);
290         follower.handleMessage(leaderActor, appendEntries);
291
292         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
293
294         assertFalse(syncStatus.isInitialSyncDone());
295
296         // Clear all the messages
297         followerActor.underlyingActor().clear();
298
299         context.setLastApplied(101);
300         context.setCommitIndex(101);
301         setLastLogEntry(context, 1, 101,
302                 new MockRaftActorContext.MockPayload(""));
303
304         entries = Arrays.asList(
305                 newReplicatedLogEntry(2, 101, "foo"));
306
307         // The new commitIndex is 101
308         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
309         follower.handleMessage(leaderActor, appendEntries);
310
311         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
312
313         assertTrue(syncStatus.isInitialSyncDone());
314
315         followerActor.underlyingActor().clear();
316
317         // Sending the same message again should not generate another message
318
319         follower.handleMessage(leaderActor, appendEntries);
320
321         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
322
323         assertNull(syncStatus);
324
325     }
326
327     @Test
328     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
329         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
330
331         MockRaftActorContext context = createActorContext();
332
333         List<ReplicatedLogEntry> entries = Arrays.asList(
334                 newReplicatedLogEntry(2, 101, "foo"));
335
336         // The new commitIndex is 101
337         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
338
339         follower = createBehavior(context);
340         follower.handleMessage(leaderActor, appendEntries);
341
342         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
343
344         assertFalse(syncStatus.isInitialSyncDone());
345
346         // Clear all the messages
347         followerActor.underlyingActor().clear();
348
349         context.setLastApplied(100);
350         setLastLogEntry(context, 1, 100,
351                 new MockRaftActorContext.MockPayload(""));
352
353         entries = Arrays.asList(
354                 newReplicatedLogEntry(2, 101, "foo"));
355
356         // leader-2 is becoming the leader now and it says the commitIndex is 45
357         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
358         follower.handleMessage(leaderActor, appendEntries);
359
360         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
361
362         // We get a new message saying initial status is not done
363         assertFalse(syncStatus.isInitialSyncDone());
364
365     }
366
367
368     @Test
369     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
370         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
371
372         MockRaftActorContext context = createActorContext();
373
374         List<ReplicatedLogEntry> entries = Arrays.asList(
375                 newReplicatedLogEntry(2, 101, "foo"));
376
377         // The new commitIndex is 101
378         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
379
380         follower = createBehavior(context);
381         follower.handleMessage(leaderActor, appendEntries);
382
383         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
384
385         assertFalse(syncStatus.isInitialSyncDone());
386
387         // Clear all the messages
388         followerActor.underlyingActor().clear();
389
390         context.setLastApplied(101);
391         context.setCommitIndex(101);
392         setLastLogEntry(context, 1, 101,
393                 new MockRaftActorContext.MockPayload(""));
394
395         entries = Arrays.asList(
396                 newReplicatedLogEntry(2, 101, "foo"));
397
398         // The new commitIndex is 101
399         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
400         follower.handleMessage(leaderActor, appendEntries);
401
402         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
403
404         assertTrue(syncStatus.isInitialSyncDone());
405
406         // Clear all the messages
407         followerActor.underlyingActor().clear();
408
409         context.setLastApplied(100);
410         setLastLogEntry(context, 1, 100,
411                 new MockRaftActorContext.MockPayload(""));
412
413         entries = Arrays.asList(
414                 newReplicatedLogEntry(2, 101, "foo"));
415
416         // leader-2 is becoming the leader now and it says the commitIndex is 45
417         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
418         follower.handleMessage(leaderActor, appendEntries);
419
420         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
421
422         // We get a new message saying initial status is not done
423         assertFalse(syncStatus.isInitialSyncDone());
424
425     }
426
427
428     /**
429      * This test verifies that when an AppendEntries RPC is received by a RaftActor
430      * with a commitIndex that is greater than what has been applied to the
431      * state machine of the RaftActor, the RaftActor applies the state and
432      * sets it current applied state to the commitIndex of the sender.
433      *
434      * @throws Exception
435      */
436     @Test
437     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
438         logStart("testHandleAppendEntriesWithNewerCommitIndex");
439
440         MockRaftActorContext context = createActorContext();
441
442         context.setLastApplied(100);
443         setLastLogEntry(context, 1, 100,
444                 new MockRaftActorContext.MockPayload(""));
445         context.getReplicatedLog().setSnapshotIndex(99);
446
447         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
448                 newReplicatedLogEntry(2, 101, "foo"));
449
450         // The new commitIndex is 101
451         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
452
453         follower = createBehavior(context);
454         follower.handleMessage(leaderActor, appendEntries);
455
456         assertEquals("getLastApplied", 101L, context.getLastApplied());
457     }
458
459     /**
460      * This test verifies that when an AppendEntries is received a specific prevLogTerm
461      * which does not match the term that is in RaftActors log entry at prevLogIndex
462      * then the RaftActor does not change it's state and it returns a failure.
463      *
464      * @throws Exception
465      */
466     @Test
467     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
468         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
469
470         MockRaftActorContext context = createActorContext();
471
472         // First set the receivers term to lower number
473         context.getTermInformation().update(95, "test");
474
475         // AppendEntries is now sent with a bigger term
476         // this will set the receivers term to be the same as the sender's term
477         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
478
479         follower = createBehavior(context);
480
481         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
482
483         Assert.assertSame(follower, newBehavior);
484
485         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
486                 AppendEntriesReply.class);
487
488         assertEquals("isSuccess", false, reply.isSuccess());
489     }
490
491     /**
492      * This test verifies that when a new AppendEntries message is received with
493      * new entries and the logs of the sender and receiver match that the new
494      * entries get added to the log and the log is incremented by the number of
495      * entries received in appendEntries
496      *
497      * @throws Exception
498      */
499     @Test
500     public void testHandleAppendEntriesAddNewEntries() {
501         logStart("testHandleAppendEntriesAddNewEntries");
502
503         MockRaftActorContext context = createActorContext();
504
505         // First set the receivers term to lower number
506         context.getTermInformation().update(1, "test");
507
508         // Prepare the receivers log
509         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
510         log.append(newReplicatedLogEntry(1, 0, "zero"));
511         log.append(newReplicatedLogEntry(1, 1, "one"));
512         log.append(newReplicatedLogEntry(1, 2, "two"));
513
514         context.setReplicatedLog(log);
515
516         // Prepare the entries to be sent with AppendEntries
517         List<ReplicatedLogEntry> entries = new ArrayList<>();
518         entries.add(newReplicatedLogEntry(1, 3, "three"));
519         entries.add(newReplicatedLogEntry(1, 4, "four"));
520
521         // Send appendEntries with the same term as was set on the receiver
522         // before the new behavior was created (1 in this case)
523         // This will not work for a Candidate because as soon as a Candidate
524         // is created it increments the term
525         short leaderPayloadVersion = 10;
526         String leaderId = "leader-1";
527         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
528
529         follower = createBehavior(context);
530
531         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
532
533         Assert.assertSame(follower, newBehavior);
534
535         assertEquals("Next index", 5, log.last().getIndex() + 1);
536         assertEquals("Entry 3", entries.get(0), log.get(3));
537         assertEquals("Entry 4", entries.get(1), log.get(4));
538
539         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
540         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
541
542         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
543     }
544
545     /**
546      * This test verifies that when a new AppendEntries message is received with
547      * new entries and the logs of the sender and receiver are out-of-sync that
548      * the log is first corrected by removing the out of sync entries from the
549      * log and then adding in the new entries sent with the AppendEntries message
550      */
551     @Test
552     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
553         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
554
555         MockRaftActorContext context = createActorContext();
556
557         // First set the receivers term to lower number
558         context.getTermInformation().update(1, "test");
559
560         // Prepare the receivers log
561         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
562         log.append(newReplicatedLogEntry(1, 0, "zero"));
563         log.append(newReplicatedLogEntry(1, 1, "one"));
564         log.append(newReplicatedLogEntry(1, 2, "two"));
565
566         context.setReplicatedLog(log);
567
568         // Prepare the entries to be sent with AppendEntries
569         List<ReplicatedLogEntry> entries = new ArrayList<>();
570         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
571         entries.add(newReplicatedLogEntry(2, 3, "three"));
572
573         // Send appendEntries with the same term as was set on the receiver
574         // before the new behavior was created (1 in this case)
575         // This will not work for a Candidate because as soon as a Candidate
576         // is created it increments the term
577         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
578
579         follower = createBehavior(context);
580
581         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
582
583         Assert.assertSame(follower, newBehavior);
584
585         // The entry at index 2 will be found out-of-sync with the leader
586         // and will be removed
587         // Then the two new entries will be added to the log
588         // Thus making the log to have 4 entries
589         assertEquals("Next index", 4, log.last().getIndex() + 1);
590         //assertEquals("Entry 2", entries.get(0), log.get(2));
591
592         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
593
594         // Check that the entry at index 2 has the new data
595         assertEquals("Entry 2", entries.get(0), log.get(2));
596
597         assertEquals("Entry 3", entries.get(1), log.get(3));
598
599         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
600     }
601
602     @Test
603     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
604         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
605
606         MockRaftActorContext context = createActorContext();
607
608         // First set the receivers term to lower number
609         context.getTermInformation().update(1, "test");
610
611         // Prepare the receivers log
612         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
613         log.append(newReplicatedLogEntry(1, 0, "zero"));
614         log.append(newReplicatedLogEntry(1, 1, "one"));
615         log.append(newReplicatedLogEntry(1, 2, "two"));
616
617         context.setReplicatedLog(log);
618
619         // Prepare the entries to be sent with AppendEntries
620         List<ReplicatedLogEntry> entries = new ArrayList<>();
621         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
622         entries.add(newReplicatedLogEntry(2, 3, "three"));
623
624         // Send appendEntries with the same term as was set on the receiver
625         // before the new behavior was created (1 in this case)
626         // This will not work for a Candidate because as soon as a Candidate
627         // is created it increments the term
628         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
629
630         context.setRaftPolicy(createRaftPolicy(false, true));
631         follower = createBehavior(context);
632
633         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
634
635         Assert.assertSame(follower, newBehavior);
636
637         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
638     }
639
640     @Test
641     public void testHandleAppendEntriesPreviousLogEntryMissing(){
642         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
643
644         MockRaftActorContext context = createActorContext();
645
646         // Prepare the receivers log
647         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
648         log.append(newReplicatedLogEntry(1, 0, "zero"));
649         log.append(newReplicatedLogEntry(1, 1, "one"));
650         log.append(newReplicatedLogEntry(1, 2, "two"));
651
652         context.setReplicatedLog(log);
653
654         // Prepare the entries to be sent with AppendEntries
655         List<ReplicatedLogEntry> entries = new ArrayList<>();
656         entries.add(newReplicatedLogEntry(1, 4, "four"));
657
658         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
659
660         follower = createBehavior(context);
661
662         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
663
664         Assert.assertSame(follower, newBehavior);
665
666         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
667     }
668
669     @Test
670     public void testHandleAppendEntriesWithExistingLogEntry() {
671         logStart("testHandleAppendEntriesWithExistingLogEntry");
672
673         MockRaftActorContext context = createActorContext();
674
675         context.getTermInformation().update(1, "test");
676
677         // Prepare the receivers log
678         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
679         log.append(newReplicatedLogEntry(1, 0, "zero"));
680         log.append(newReplicatedLogEntry(1, 1, "one"));
681
682         context.setReplicatedLog(log);
683
684         // Send the last entry again.
685         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
686
687         follower = createBehavior(context);
688
689         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
690
691         assertEquals("Next index", 2, log.last().getIndex() + 1);
692         assertEquals("Entry 1", entries.get(0), log.get(1));
693
694         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
695
696         // Send the last entry again and also a new one.
697
698         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
699
700         leaderActor.underlyingActor().clear();
701         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
702
703         assertEquals("Next index", 3, log.last().getIndex() + 1);
704         assertEquals("Entry 1", entries.get(0), log.get(1));
705         assertEquals("Entry 2", entries.get(1), log.get(2));
706
707         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
708     }
709
710     @Test
711     public void testHandleAppendEntriesAfterInstallingSnapshot(){
712         logStart("testHandleAppendAfterInstallingSnapshot");
713
714         MockRaftActorContext context = createActorContext();
715
716         // Prepare the receivers log
717         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
718
719         // Set up a log as if it has been snapshotted
720         log.setSnapshotIndex(3);
721         log.setSnapshotTerm(1);
722
723         context.setReplicatedLog(log);
724
725         // Prepare the entries to be sent with AppendEntries
726         List<ReplicatedLogEntry> entries = new ArrayList<>();
727         entries.add(newReplicatedLogEntry(1, 4, "four"));
728
729         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
730
731         follower = createBehavior(context);
732
733         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
734
735         Assert.assertSame(follower, newBehavior);
736
737         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
738     }
739
740
741     /**
742      * This test verifies that when InstallSnapshot is received by
743      * the follower its applied correctly.
744      *
745      * @throws Exception
746      */
747     @Test
748     public void testHandleInstallSnapshot() throws Exception {
749         logStart("testHandleInstallSnapshot");
750
751         MockRaftActorContext context = createActorContext();
752
753         follower = createBehavior(context);
754
755         ByteString bsSnapshot  = createSnapshot();
756         int offset = 0;
757         int snapshotLength = bsSnapshot.size();
758         int chunkSize = 50;
759         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
760         int lastIncludedIndex = 1;
761         int chunkIndex = 1;
762         InstallSnapshot lastInstallSnapshot = null;
763
764         for(int i = 0; i < totalChunks; i++) {
765             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
766             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
767                     chunkData, chunkIndex, totalChunks);
768             follower.handleMessage(leaderActor, lastInstallSnapshot);
769             offset = offset + 50;
770             lastIncludedIndex++;
771             chunkIndex++;
772         }
773
774         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
775                 ApplySnapshot.class);
776         Snapshot snapshot = applySnapshot.getSnapshot();
777         assertNotNull(lastInstallSnapshot);
778         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
779         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
780                 snapshot.getLastAppliedTerm());
781         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
782                 snapshot.getLastAppliedIndex());
783         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
784         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
785
786         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
787                 leaderActor, InstallSnapshotReply.class);
788         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
789
790         chunkIndex = 1;
791         for(InstallSnapshotReply reply: replies) {
792             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
793             assertEquals("getTerm", 1, reply.getTerm());
794             assertEquals("isSuccess", true, reply.isSuccess());
795             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
796         }
797
798         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
799     }
800
801
802     /**
803      * Verify that when an AppendEntries is sent to a follower during a snapshot install
804      * the Follower short-circuits the processing of the AppendEntries message.
805      *
806      * @throws Exception
807      */
808     @Test
809     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
810         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
811
812         MockRaftActorContext context = createActorContext();
813
814         follower = createBehavior(context);
815
816         ByteString bsSnapshot  = createSnapshot();
817         int snapshotLength = bsSnapshot.size();
818         int chunkSize = 50;
819         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
820         int lastIncludedIndex = 1;
821
822         // Check that snapshot installation is not in progress
823         assertNull(((Follower) follower).getSnapshotTracker());
824
825         // Make sure that we have more than 1 chunk to send
826         assertTrue(totalChunks > 1);
827
828         // Send an install snapshot with the first chunk to start the process of installing a snapshot
829         ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
830         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
831                 chunkData, 1, totalChunks));
832
833         // Check if snapshot installation is in progress now
834         assertNotNull(((Follower) follower).getSnapshotTracker());
835
836         // Send an append entry
837         AppendEntries appendEntries = mock(AppendEntries.class);
838         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
839
840         follower.handleMessage(leaderActor, appendEntries);
841
842         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
843         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
844         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
845         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
846
847         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
848         verify(appendEntries, never()).getPrevLogIndex();
849
850     }
851
852     @Test
853     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
854         logStart("testInitialSyncUpWithHandleInstallSnapshot");
855
856         MockRaftActorContext context = createActorContext();
857
858         follower = createBehavior(context);
859
860         ByteString bsSnapshot  = createSnapshot();
861         int offset = 0;
862         int snapshotLength = bsSnapshot.size();
863         int chunkSize = 50;
864         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
865         int lastIncludedIndex = 1;
866         int chunkIndex = 1;
867         InstallSnapshot lastInstallSnapshot = null;
868
869         for(int i = 0; i < totalChunks; i++) {
870             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
871             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
872                     chunkData, chunkIndex, totalChunks);
873             follower.handleMessage(leaderActor, lastInstallSnapshot);
874             offset = offset + 50;
875             lastIncludedIndex++;
876             chunkIndex++;
877         }
878
879         FollowerInitialSyncUpStatus syncStatus =
880                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
881
882         assertFalse(syncStatus.isInitialSyncDone());
883
884         // Clear all the messages
885         followerActor.underlyingActor().clear();
886
887         context.setLastApplied(101);
888         context.setCommitIndex(101);
889         setLastLogEntry(context, 1, 101,
890                 new MockRaftActorContext.MockPayload(""));
891
892         List<ReplicatedLogEntry> entries = Arrays.asList(
893                 newReplicatedLogEntry(2, 101, "foo"));
894
895         // The new commitIndex is 101
896         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
897         follower.handleMessage(leaderActor, appendEntries);
898
899         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
900
901         assertTrue(syncStatus.isInitialSyncDone());
902     }
903
904     @Test
905     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
906         logStart("testHandleOutOfSequenceInstallSnapshot");
907
908         MockRaftActorContext context = createActorContext();
909
910         follower = createBehavior(context);
911
912         ByteString bsSnapshot = createSnapshot();
913
914         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
915                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
916         follower.handleMessage(leaderActor, installSnapshot);
917
918         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
919                 InstallSnapshotReply.class);
920
921         assertEquals("isSuccess", false, reply.isSuccess());
922         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
923         assertEquals("getTerm", 1, reply.getTerm());
924         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
925
926         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
927     }
928
929     @Test
930     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
931         MockRaftActorContext context = createActorContext();
932
933         Stopwatch stopwatch = Stopwatch.createStarted();
934
935         follower = createBehavior(context);
936
937         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
938
939         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
940
941         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
942     }
943
944     @Test
945     public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
946         MockRaftActorContext context = createActorContext();
947         context.setConfigParams(new DefaultConfigParamsImpl(){
948             @Override
949             public FiniteDuration getElectionTimeOutInterval() {
950                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
951             }
952         });
953
954         context.setRaftPolicy(createRaftPolicy(false, false));
955
956         follower = createBehavior(context);
957
958         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
959     }
960
961     @Test
962     public void testElectionScheduledWhenAnyRaftRPCReceived(){
963         MockRaftActorContext context = createActorContext();
964         follower = createBehavior(context);
965         follower.handleMessage(leaderActor, new RaftRPC() {
966             @Override
967             public long getTerm() {
968                 return 100;
969             }
970         });
971         assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
972     }
973
974     @Test
975     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
976         MockRaftActorContext context = createActorContext();
977         follower = createBehavior(context);
978         follower.handleMessage(leaderActor, "non-raft-rpc");
979         assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
980     }
981
982     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
983         int snapshotLength = bs.size();
984         int start = offset;
985         int size = chunkSize;
986         if (chunkSize > snapshotLength) {
987             size = snapshotLength;
988         } else {
989             if ((start + chunkSize) > snapshotLength) {
990                 size = snapshotLength - start;
991             }
992         }
993         return bs.substring(start, start + size);
994     }
995
996     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
997             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
998         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
999     }
1000
1001     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1002                                                    String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1003                                                    boolean expForceInstallSnapshot) {
1004
1005         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1006                 AppendEntriesReply.class);
1007
1008         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1009         assertEquals("getTerm", expTerm, reply.getTerm());
1010         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1011         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1012         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1013         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1014         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1015     }
1016
1017
1018     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1019         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1020                 new MockRaftActorContext.MockPayload(data));
1021     }
1022
1023     private ByteString createSnapshot(){
1024         HashMap<String, String> followerSnapshot = new HashMap<>();
1025         followerSnapshot.put("1", "A");
1026         followerSnapshot.put("2", "B");
1027         followerSnapshot.put("3", "C");
1028
1029         return toByteString(followerSnapshot);
1030     }
1031
1032     @Override
1033     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1034             ActorRef actorRef, RaftRPC rpc) throws Exception {
1035         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1036
1037         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
1038         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1039     }
1040
1041     @Override
1042     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
1043             throws Exception {
1044         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1045         assertEquals("isSuccess", true, reply.isSuccess());
1046     }
1047
1048     private static class TestFollower extends Follower {
1049
1050         int electionTimeoutCount = 0;
1051
1052         public TestFollower(RaftActorContext context) {
1053             super(context);
1054         }
1055
1056         @Override
1057         protected void scheduleElection(FiniteDuration interval) {
1058             electionTimeoutCount++;
1059             super.scheduleElection(interval);
1060         }
1061
1062         public int getElectionTimeoutCount() {
1063             return electionTimeoutCount;
1064         }
1065     }
1066 }